Whamcloud - gitweb
LU-5248 osd: NOT inject OBD_FAIL_FID_LOOKUP on dotdot
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/osd/osd_handler.c
37  *
38  * Top-level entry points into osd module
39  *
40  * Author: Nikita Danilov <nikita@clusterfs.com>
41  *         Pravin Shelar <pravin.shelar@sun.com> : Added fid in dirent
42  */
43
44 #define DEBUG_SUBSYSTEM S_MDS
45
46 #include <linux/module.h>
47
48 /* LUSTRE_VERSION_CODE */
49 #include <lustre_ver.h>
50 /* prerequisite for linux/xattr.h */
51 #include <linux/types.h>
52 /* prerequisite for linux/xattr.h */
53 #include <linux/fs.h>
54 /* XATTR_{REPLACE,CREATE} */
55 #include <linux/xattr.h>
56
57 /*
58  * struct OBD_{ALLOC,FREE}*()
59  * OBD_FAIL_CHECK
60  */
61 #include <obd_support.h>
62 /* struct ptlrpc_thread */
63 #include <lustre_net.h>
64 #include <lustre_fid.h>
65 /* process_config */
66 #include <lustre_param.h>
67
68 #include "osd_internal.h"
69 #include "osd_dynlocks.h"
70
71 /* llo_* api support */
72 #include <md_object.h>
73 #include <lustre_quota.h>
74
75 #include <ldiskfs/xattr.h>
76
77 int ldiskfs_pdo = 1;
78 CFS_MODULE_PARM(ldiskfs_pdo, "i", int, 0644,
79                 "ldiskfs with parallel directory operations");
80
81 int ldiskfs_track_declares_assert;
82 CFS_MODULE_PARM(ldiskfs_track_declares_assert, "i", int, 0644,
83                 "LBUG during tracking of declares");
84
85 /* Slab to allocate dynlocks */
86 struct kmem_cache *dynlock_cachep;
87
88 static struct lu_kmem_descr ldiskfs_caches[] = {
89         {
90                 .ckd_cache = &dynlock_cachep,
91                 .ckd_name  = "dynlock_cache",
92                 .ckd_size  = sizeof(struct dynlock_handle)
93         },
94         {
95                 .ckd_cache = NULL
96         }
97 };
98
99 static const char dot[] = ".";
100 static const char dotdot[] = "..";
101 static const char remote_obj_dir[] = "REM_OBJ_DIR";
102
103 static const struct lu_object_operations      osd_lu_obj_ops;
104 static const struct dt_object_operations      osd_obj_ops;
105 static const struct dt_object_operations      osd_obj_ea_ops;
106 static const struct dt_object_operations      osd_obj_otable_it_ops;
107 static const struct dt_index_operations       osd_index_iam_ops;
108 static const struct dt_index_operations       osd_index_ea_ops;
109
110 int osd_trans_declare_op2rb[] = {
111         [OSD_OT_ATTR_SET]       = OSD_OT_ATTR_SET,
112         [OSD_OT_PUNCH]          = OSD_OT_MAX,
113         [OSD_OT_XATTR_SET]      = OSD_OT_XATTR_SET,
114         [OSD_OT_CREATE]         = OSD_OT_DESTROY,
115         [OSD_OT_DESTROY]        = OSD_OT_CREATE,
116         [OSD_OT_REF_ADD]        = OSD_OT_REF_DEL,
117         [OSD_OT_REF_DEL]        = OSD_OT_REF_ADD,
118         [OSD_OT_WRITE]          = OSD_OT_WRITE,
119         [OSD_OT_INSERT]         = OSD_OT_DELETE,
120         [OSD_OT_DELETE]         = OSD_OT_INSERT,
121         [OSD_OT_UPDATE]         = OSD_OT_MAX,
122         [OSD_OT_QUOTA]          = OSD_OT_MAX,
123 };
124
125 static int osd_has_index(const struct osd_object *obj)
126 {
127         return obj->oo_dt.do_index_ops != NULL;
128 }
129
130 static int osd_object_invariant(const struct lu_object *l)
131 {
132         return osd_invariant(osd_obj(l));
133 }
134
135 /*
136  * Concurrency: doesn't matter
137  */
138 static int osd_read_locked(const struct lu_env *env, struct osd_object *o)
139 {
140         return osd_oti_get(env)->oti_r_locks > 0;
141 }
142
143 /*
144  * Concurrency: doesn't matter
145  */
146 static int osd_write_locked(const struct lu_env *env, struct osd_object *o)
147 {
148         struct osd_thread_info *oti = osd_oti_get(env);
149         return oti->oti_w_locks > 0 && o->oo_owner == env;
150 }
151
152 /*
153  * Concurrency: doesn't access mutable data
154  */
155 static int osd_root_get(const struct lu_env *env,
156                         struct dt_device *dev, struct lu_fid *f)
157 {
158         lu_local_obj_fid(f, OSD_FS_ROOT_OID);
159         return 0;
160 }
161
162 /*
163  * OSD object methods.
164  */
165
166 /*
167  * Concurrency: no concurrent access is possible that early in object
168  * life-cycle.
169  */
170 static struct lu_object *osd_object_alloc(const struct lu_env *env,
171                                           const struct lu_object_header *hdr,
172                                           struct lu_device *d)
173 {
174         struct osd_object *mo;
175
176         OBD_ALLOC_PTR(mo);
177         if (mo != NULL) {
178                 struct lu_object *l;
179
180                 l = &mo->oo_dt.do_lu;
181                 dt_object_init(&mo->oo_dt, NULL, d);
182                 mo->oo_dt.do_ops = &osd_obj_ea_ops;
183                 l->lo_ops = &osd_lu_obj_ops;
184                 init_rwsem(&mo->oo_sem);
185                 init_rwsem(&mo->oo_ext_idx_sem);
186                 spin_lock_init(&mo->oo_guard);
187                 return l;
188         } else {
189                 return NULL;
190         }
191 }
192
193 int osd_get_lma(struct osd_thread_info *info, struct inode *inode,
194                 struct dentry *dentry, struct lustre_mdt_attrs *lma)
195 {
196         int rc;
197
198         CLASSERT(LMA_OLD_SIZE >= sizeof(*lma));
199         rc = __osd_xattr_get(inode, dentry, XATTR_NAME_LMA,
200                              info->oti_mdt_attrs_old, LMA_OLD_SIZE);
201         if (rc > 0) {
202                 if ((void *)lma != (void *)info->oti_mdt_attrs_old)
203                         memcpy(lma, info->oti_mdt_attrs_old, sizeof(*lma));
204                 rc = 0;
205                 lustre_lma_swab(lma);
206                 /* Check LMA compatibility */
207                 if (lma->lma_incompat & ~LMA_INCOMPAT_SUPP) {
208                         CWARN("%.16s: unsupported incompat LMA feature(s) %#x "
209                               "for fid = "DFID", ino = %lu\n",
210                               LDISKFS_SB(inode->i_sb)->s_es->s_volume_name,
211                               lma->lma_incompat & ~LMA_INCOMPAT_SUPP,
212                               PFID(&lma->lma_self_fid), inode->i_ino);
213                         rc = -EOPNOTSUPP;
214                 }
215         } else if (rc == 0) {
216                 rc = -ENODATA;
217         }
218
219         return rc;
220 }
221
222 /*
223  * retrieve object from backend ext fs.
224  **/
225 struct inode *osd_iget(struct osd_thread_info *info, struct osd_device *dev,
226                        struct osd_inode_id *id)
227 {
228         struct inode *inode = NULL;
229
230         inode = ldiskfs_iget(osd_sb(dev), id->oii_ino);
231         if (IS_ERR(inode)) {
232                 CDEBUG(D_INODE, "no inode: ino = %u, rc = %ld\n",
233                        id->oii_ino, PTR_ERR(inode));
234         } else if (id->oii_gen != OSD_OII_NOGEN &&
235                    inode->i_generation != id->oii_gen) {
236                 CDEBUG(D_INODE, "unmatched inode: ino = %u, oii_gen = %u, "
237                        "i_generation = %u\n",
238                        id->oii_ino, id->oii_gen, inode->i_generation);
239                 iput(inode);
240                 inode = ERR_PTR(-ESTALE);
241         } else if (inode->i_nlink == 0) {
242                 /* due to parallel readdir and unlink,
243                 * we can have dead inode here. */
244                 CDEBUG(D_INODE, "stale inode: ino = %u\n", id->oii_ino);
245                 iput(inode);
246                 inode = ERR_PTR(-ESTALE);
247         } else if (is_bad_inode(inode)) {
248                 CWARN("%.16s: bad inode: ino = %u\n",
249                 LDISKFS_SB(osd_sb(dev))->s_es->s_volume_name, id->oii_ino);
250                 iput(inode);
251                 inode = ERR_PTR(-ENOENT);
252         } else {
253                 if (id->oii_gen == OSD_OII_NOGEN)
254                         osd_id_gen(id, inode->i_ino, inode->i_generation);
255
256                 /* Do not update file c/mtime in ldiskfs.
257                  * NB: we don't have any lock to protect this because we don't
258                  * have reference on osd_object now, but contention with
259                  * another lookup + attr_set can't happen in the tiny window
260                  * between if (...) and set S_NOCMTIME. */
261                 if (!(inode->i_flags & S_NOCMTIME))
262                         inode->i_flags |= S_NOCMTIME;
263         }
264         return inode;
265 }
266
267 static struct inode *
268 osd_iget_fid(struct osd_thread_info *info, struct osd_device *dev,
269              struct osd_inode_id *id, struct lu_fid *fid)
270 {
271         struct lustre_mdt_attrs *lma   = &info->oti_mdt_attrs;
272         struct inode            *inode;
273         int                      rc;
274
275         inode = osd_iget(info, dev, id);
276         if (IS_ERR(inode))
277                 return inode;
278
279         rc = osd_get_lma(info, inode, &info->oti_obj_dentry, lma);
280         if (rc == 0) {
281                 *fid = lma->lma_self_fid;
282         } else if (rc == -ENODATA) {
283                 if (unlikely(inode == osd_sb(dev)->s_root->d_inode))
284                         lu_local_obj_fid(fid, OSD_FS_ROOT_OID);
285                 else
286                         lu_igif_build(fid, inode->i_ino, inode->i_generation);
287         } else {
288                 iput(inode);
289                 inode = ERR_PTR(rc);
290         }
291         return inode;
292 }
293
294 static struct inode *osd_iget_check(struct osd_thread_info *info,
295                                     struct osd_device *dev,
296                                     const struct lu_fid *fid,
297                                     struct osd_inode_id *id,
298                                     bool in_oi)
299 {
300         struct inode    *inode;
301         int              rc     = 0;
302         ENTRY;
303
304         inode = ldiskfs_iget(osd_sb(dev), id->oii_ino);
305         if (IS_ERR(inode)) {
306                 rc = PTR_ERR(inode);
307                 if (!in_oi || (rc != -ENOENT && rc != -ESTALE)) {
308                         CDEBUG(D_INODE, "no inode: ino = %u, rc = %d\n",
309                                id->oii_ino, rc);
310
311                         GOTO(put, rc);
312                 }
313
314                 goto check_oi;
315         }
316
317         if (is_bad_inode(inode)) {
318                 rc = -ENOENT;
319                 if (!in_oi) {
320                         CDEBUG(D_INODE, "bad inode: ino = %u\n", id->oii_ino);
321
322                         GOTO(put, rc);
323                 }
324
325                 goto check_oi;
326         }
327
328         if (id->oii_gen != OSD_OII_NOGEN &&
329             inode->i_generation != id->oii_gen) {
330                 rc = -ESTALE;
331                 if (!in_oi) {
332                         CDEBUG(D_INODE, "unmatched inode: ino = %u, "
333                                "oii_gen = %u, i_generation = %u\n",
334                                id->oii_ino, id->oii_gen, inode->i_generation);
335
336                         GOTO(put, rc);
337                 }
338
339                 goto check_oi;
340         }
341
342         if (inode->i_nlink == 0) {
343                 rc = -ENOENT;
344                 if (!in_oi) {
345                         CDEBUG(D_INODE, "stale inode: ino = %u\n", id->oii_ino);
346
347                         GOTO(put, rc);
348                 }
349
350                 goto check_oi;
351         }
352
353 check_oi:
354         if (rc != 0) {
355                 LASSERTF(rc == -ESTALE || rc == -ENOENT, "rc = %d\n", rc);
356
357                 rc = osd_oi_lookup(info, dev, fid, id, OI_CHECK_FLD);
358                 /* XXX: There are three possible cases:
359                  *      1. rc = 0.
360                  *         Backup/restore caused the OI invalid.
361                  *      2. rc = 0.
362                  *         Someone unlinked the object but NOT removed
363                  *         the OI mapping, such as mount target device
364                  *         as ldiskfs, and modify something directly.
365                  *      3. rc = -ENOENT.
366                  *         Someone just removed the object between the
367                  *         former oi_lookup and the iget. It is normal.
368                  *      4. Other failure cases.
369                  *
370                  *      Generally, when the device is mounted, it will
371                  *      auto check whether the system is restored from
372                  *      file-level backup or not. We trust such detect
373                  *      to distinguish the 1st case from the 2nd case. */
374                 if (rc == 0) {
375                         if (!IS_ERR(inode) && inode->i_generation != 0 &&
376                             inode->i_generation == id->oii_gen)
377                                 rc = -ENOENT;
378                         else
379                                 rc = -EREMCHG;
380                 }
381         } else {
382                 if (id->oii_gen == OSD_OII_NOGEN)
383                         osd_id_gen(id, inode->i_ino, inode->i_generation);
384
385                 /* Do not update file c/mtime in ldiskfs.
386                  * NB: we don't have any lock to protect this because we don't
387                  * have reference on osd_object now, but contention with
388                  * another lookup + attr_set can't happen in the tiny window
389                  * between if (...) and set S_NOCMTIME. */
390                 if (!(inode->i_flags & S_NOCMTIME))
391                         inode->i_flags |= S_NOCMTIME;
392         }
393
394         GOTO(put, rc);
395
396 put:
397         if (rc != 0) {
398                 if (!IS_ERR(inode))
399                         iput(inode);
400
401                 inode = ERR_PTR(rc);
402         }
403
404         return inode;
405 }
406
407 /**
408  * \retval +v: new filter_fid, does not contain self-fid
409  * \retval 0:  filter_fid_old, contains self-fid
410  * \retval -v: other failure cases
411  */
412 int osd_get_idif(struct osd_thread_info *info, struct inode *inode,
413                  struct dentry *dentry, struct lu_fid *fid)
414 {
415         struct filter_fid_old   *ff     = &info->oti_ff;
416         struct ost_id           *ostid  = &info->oti_ostid;
417         int                      rc;
418
419         rc = __osd_xattr_get(inode, dentry, XATTR_NAME_FID, ff, sizeof(*ff));
420         if (rc == sizeof(*ff)) {
421                 rc = 0;
422                 ostid_set_seq(ostid, le64_to_cpu(ff->ff_seq));
423                 ostid_set_id(ostid, le64_to_cpu(ff->ff_objid));
424                 /* XXX: should use real OST index in the future. LU-3569 */
425                 ostid_to_fid(fid, ostid, 0);
426         } else if (rc == sizeof(struct filter_fid)) {
427                 rc = 1;
428         } else if (rc >= 0) {
429                 rc = -EINVAL;
430         }
431
432         return rc;
433 }
434
435 static int osd_check_lma(const struct lu_env *env, struct osd_object *obj)
436 {
437         struct osd_thread_info  *info   = osd_oti_get(env);
438         struct lustre_mdt_attrs *lma    = &info->oti_mdt_attrs;
439         struct inode            *inode  = obj->oo_inode;
440         struct dentry           *dentry = &info->oti_obj_dentry;
441         struct lu_fid           *fid    = NULL;
442         int                      rc;
443         ENTRY;
444
445         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_COMPAT_INVALID_ENTRY))
446                 RETURN(0);
447
448         CLASSERT(LMA_OLD_SIZE >= sizeof(*lma));
449         rc = __osd_xattr_get(inode, dentry, XATTR_NAME_LMA,
450                              info->oti_mdt_attrs_old, LMA_OLD_SIZE);
451         if (rc == -ENODATA && !fid_is_igif(lu_object_fid(&obj->oo_dt.do_lu)) &&
452             osd_obj2dev(obj)->od_check_ff) {
453                 fid = &lma->lma_self_fid;
454                 rc = osd_get_idif(info, inode, dentry, fid);
455                 if (rc > 0)
456                         RETURN(0);
457         }
458
459         if (unlikely(rc == -ENODATA))
460                 RETURN(0);
461
462         if (rc < 0)
463                 RETURN(rc);
464
465         if (rc > 0) {
466                 rc = 0;
467                 lustre_lma_swab(lma);
468                 if (unlikely((lma->lma_incompat & ~LMA_INCOMPAT_SUPP) ||
469                              CFS_FAIL_CHECK(OBD_FAIL_OSD_LMA_INCOMPAT))) {
470                         CWARN("%s: unsupported incompat LMA feature(s) %#x for "
471                               "fid = "DFID", ino = %lu\n",
472                               osd_obj2dev(obj)->od_svname,
473                               lma->lma_incompat & ~LMA_INCOMPAT_SUPP,
474                               PFID(lu_object_fid(&obj->oo_dt.do_lu)),
475                               inode->i_ino);
476                         rc = -EOPNOTSUPP;
477                 } else if (!(lma->lma_compat & LMAC_NOT_IN_OI)) {
478                         fid = &lma->lma_self_fid;
479                 }
480         }
481
482         if (fid != NULL &&
483             unlikely(!lu_fid_eq(lu_object_fid(&obj->oo_dt.do_lu), fid))) {
484                 CDEBUG(D_INODE, "%s: FID "DFID" != self_fid "DFID"\n",
485                        osd_obj2dev(obj)->od_svname,
486                        PFID(lu_object_fid(&obj->oo_dt.do_lu)),
487                        PFID(&lma->lma_self_fid));
488                 rc = -EREMCHG;
489         }
490
491         RETURN(rc);
492 }
493
494 static int osd_fid_lookup(const struct lu_env *env, struct osd_object *obj,
495                           const struct lu_fid *fid,
496                           const struct lu_object_conf *conf)
497 {
498         struct osd_thread_info *info;
499         struct lu_device       *ldev   = obj->oo_dt.do_lu.lo_dev;
500         struct osd_device      *dev;
501         struct osd_idmap_cache *oic;
502         struct osd_inode_id    *id;
503         struct inode           *inode;
504         struct osd_scrub       *scrub;
505         struct scrub_file      *sf;
506         int                     result;
507         int                     saved  = 0;
508         bool                    in_oi  = false;
509         bool                    triggered = false;
510         ENTRY;
511
512         LINVRNT(osd_invariant(obj));
513         LASSERT(obj->oo_inode == NULL);
514         LASSERTF(fid_is_sane(fid) || fid_is_idif(fid), DFID, PFID(fid));
515
516         dev = osd_dev(ldev);
517         scrub = &dev->od_scrub;
518         sf = &scrub->os_file;
519         info = osd_oti_get(env);
520         LASSERT(info);
521         oic = &info->oti_cache;
522
523         if (OBD_FAIL_CHECK(OBD_FAIL_OST_ENOENT))
524                 RETURN(-ENOENT);
525
526         /* For the object is created as locking anchor, or for the object to
527          * be created on disk. No need to osd_oi_lookup() at here because FID
528          * shouldn't never be re-used, if it's really a duplicate FID from
529          * unexpected reason, we should be able to detect it later by calling
530          * do_create->osd_oi_insert(). */
531         if (conf != NULL && conf->loc_flags & LOC_F_NEW)
532                 GOTO(out, result = 0);
533
534         /* Search order: 1. per-thread cache. */
535         if (lu_fid_eq(fid, &oic->oic_fid) &&
536             likely(oic->oic_dev == dev)) {
537                 id = &oic->oic_lid;
538                 goto iget;
539         }
540
541         id = &info->oti_id;
542         if (!cfs_list_empty(&scrub->os_inconsistent_items)) {
543                 /* Search order: 2. OI scrub pending list. */
544                 result = osd_oii_lookup(dev, fid, id);
545                 if (result == 0)
546                         goto iget;
547         }
548
549         /* Search order: 3. OI files. */
550         result = osd_oi_lookup(info, dev, fid, id, OI_CHECK_FLD);
551         if (result == -ENOENT) {
552                 if (!fid_is_norm(fid) ||
553                     fid_is_on_ost(info, dev, fid, OI_CHECK_FLD) ||
554                     !ldiskfs_test_bit(osd_oi_fid2idx(dev,fid),
555                                       sf->sf_oi_bitmap))
556                         GOTO(out, result = 0);
557
558                 goto trigger;
559         }
560
561         if (result != 0)
562                 GOTO(out, result);
563
564         in_oi = true;
565
566 iget:
567         inode = osd_iget_check(info, dev, fid, id, in_oi);
568         if (IS_ERR(inode)) {
569                 result = PTR_ERR(inode);
570                 if (result == -ENOENT || result == -ESTALE) {
571                         if (!in_oi)
572                                 fid_zero(&oic->oic_fid);
573
574                         GOTO(out, result = -ENOENT);
575                 } else if (result == -EREMCHG) {
576
577 trigger:
578                         if (!in_oi)
579                                 fid_zero(&oic->oic_fid);
580
581                         if (unlikely(triggered))
582                                 GOTO(out, result = saved);
583
584                         triggered = true;
585                         if (thread_is_running(&scrub->os_thread)) {
586                                 result = -EINPROGRESS;
587                         } else if (!dev->od_noscrub) {
588                                 result = osd_scrub_start(dev);
589                                 LCONSOLE_WARN("%.16s: trigger OI scrub by RPC "
590                                               "for "DFID", rc = %d [1]\n",
591                                               osd_name(dev), PFID(fid), result);
592                                 if (result == 0 || result == -EALREADY)
593                                         result = -EINPROGRESS;
594                                 else
595                                         result = -EREMCHG;
596                         }
597
598                         /* We still have chance to get the valid inode: for the
599                          * object which is referenced by remote name entry, the
600                          * object on the local MDT will be linked under the dir
601                          * of "/REMOTE_PARENT_DIR" with its FID string as name.
602                          *
603                          * We do not know whether the object for the given FID
604                          * is referenced by some remote name entry or not, and
605                          * especially for DNE II, a multiple-linked object may
606                          * have many name entries reside on many MDTs.
607                          *
608                          * To simplify the operation, OSD will not distinguish
609                          * more, just lookup "/REMOTE_PARENT_DIR". Usually, it
610                          * only happened for the RPC from other MDT during the
611                          * OI scrub, or for the client side RPC with FID only,
612                          * such as FID to path, or from old connected client. */
613                         saved = result;
614                         result = osd_lookup_in_remote_parent(info, dev,
615                                                              fid, id);
616                         if (result == 0) {
617                                 in_oi = false;
618                                 goto iget;
619                         }
620
621                         result = saved;
622                 }
623
624                 GOTO(out, result);
625         }
626
627         obj->oo_inode = inode;
628         LASSERT(obj->oo_inode->i_sb == osd_sb(dev));
629
630         result = osd_check_lma(env, obj);
631         if (result != 0) {
632                 iput(inode);
633                 obj->oo_inode = NULL;
634                 if (result == -EREMCHG) {
635                         if (!in_oi) {
636                                 result = osd_oi_lookup(info, dev, fid, id,
637                                                        OI_CHECK_FLD);
638                                 if (result != 0) {
639                                         fid_zero(&oic->oic_fid);
640                                         GOTO(out, result);
641                                 }
642                         }
643
644                         goto trigger;
645                 }
646
647                 GOTO(out, result);
648         }
649
650         obj->oo_compat_dot_created = 1;
651         obj->oo_compat_dotdot_created = 1;
652
653         if (!S_ISDIR(inode->i_mode) || !ldiskfs_pdo) /* done */
654                 GOTO(out, result = 0);
655
656         LASSERT(obj->oo_hl_head == NULL);
657         obj->oo_hl_head = ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
658         if (obj->oo_hl_head == NULL) {
659                 obj->oo_inode = NULL;
660                 iput(inode);
661                 GOTO(out, result = -ENOMEM);
662         }
663         GOTO(out, result = 0);
664
665 out:
666         LINVRNT(osd_invariant(obj));
667         return result;
668 }
669
670 /*
671  * Concurrency: shouldn't matter.
672  */
673 static void osd_object_init0(struct osd_object *obj)
674 {
675         LASSERT(obj->oo_inode != NULL);
676         obj->oo_dt.do_body_ops = &osd_body_ops;
677         obj->oo_dt.do_lu.lo_header->loh_attr |=
678                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
679 }
680
681 /*
682  * Concurrency: no concurrent access is possible that early in object
683  * life-cycle.
684  */
685 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
686                            const struct lu_object_conf *conf)
687 {
688         struct osd_object *obj = osd_obj(l);
689         int result;
690
691         LINVRNT(osd_invariant(obj));
692
693         if (fid_is_otable_it(&l->lo_header->loh_fid)) {
694                 obj->oo_dt.do_ops = &osd_obj_otable_it_ops;
695                 l->lo_header->loh_attr |= LOHA_EXISTS;
696                 return 0;
697         }
698
699         result = osd_fid_lookup(env, obj, lu_object_fid(l), conf);
700         obj->oo_dt.do_body_ops = &osd_body_ops_new;
701         if (result == 0 && obj->oo_inode != NULL)
702                 osd_object_init0(obj);
703
704         LINVRNT(osd_invariant(obj));
705         return result;
706 }
707
708 /*
709  * Concurrency: no concurrent access is possible that late in object
710  * life-cycle.
711  */
712 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
713 {
714         struct osd_object *obj = osd_obj(l);
715
716         LINVRNT(osd_invariant(obj));
717
718         dt_object_fini(&obj->oo_dt);
719         if (obj->oo_hl_head != NULL)
720                 ldiskfs_htree_lock_head_free(obj->oo_hl_head);
721         OBD_FREE_PTR(obj);
722 }
723
724 /*
725  * Concurrency: no concurrent access is possible that late in object
726  * life-cycle.
727  */
728 static void osd_index_fini(struct osd_object *o)
729 {
730         struct iam_container *bag;
731
732         if (o->oo_dir != NULL) {
733                 bag = &o->oo_dir->od_container;
734                 if (o->oo_inode != NULL) {
735                         if (bag->ic_object == o->oo_inode)
736                                 iam_container_fini(bag);
737                 }
738                 OBD_FREE_PTR(o->oo_dir);
739                 o->oo_dir = NULL;
740         }
741 }
742
743 /*
744  * Concurrency: no concurrent access is possible that late in object
745  * life-cycle (for all existing callers, that is. New callers have to provide
746  * their own locking.)
747  */
748 static int osd_inode_unlinked(const struct inode *inode)
749 {
750         return inode->i_nlink == 0;
751 }
752
753 enum {
754         OSD_TXN_OI_DELETE_CREDITS    = 20,
755         OSD_TXN_INODE_DELETE_CREDITS = 20
756 };
757
758 /*
759  * Journal
760  */
761
762 #if OSD_THANDLE_STATS
763 /**
764  * Set time when the handle is allocated
765  */
766 static void osd_th_alloced(struct osd_thandle *oth)
767 {
768         oth->oth_alloced = cfs_time_current();
769 }
770
771 /**
772  * Set time when the handle started
773  */
774 static void osd_th_started(struct osd_thandle *oth)
775 {
776         oth->oth_started = cfs_time_current();
777 }
778
779 /**
780  * Helper function to convert time interval to microseconds packed in
781  * long int.
782  */
783 static long interval_to_usec(cfs_time_t start, cfs_time_t end)
784 {
785         struct timeval val;
786
787         cfs_duration_usec(cfs_time_sub(end, start), &val);
788         return val.tv_sec * 1000000 + val.tv_usec;
789 }
790
791 /**
792  * Check whether the we deal with this handle for too long.
793  */
794 static void __osd_th_check_slow(void *oth, struct osd_device *dev,
795                                 cfs_time_t alloced, cfs_time_t started,
796                                 cfs_time_t closed)
797 {
798         cfs_time_t now = cfs_time_current();
799
800         LASSERT(dev != NULL);
801
802         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_STARTING,
803                             interval_to_usec(alloced, started));
804         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_OPEN,
805                             interval_to_usec(started, closed));
806         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_CLOSING,
807                             interval_to_usec(closed, now));
808
809         if (cfs_time_before(cfs_time_add(alloced, cfs_time_seconds(30)), now)) {
810                 CWARN("transaction handle %p was open for too long: "
811                       "now "CFS_TIME_T" ,"
812                       "alloced "CFS_TIME_T" ,"
813                       "started "CFS_TIME_T" ,"
814                       "closed "CFS_TIME_T"\n",
815                       oth, now, alloced, started, closed);
816                 libcfs_debug_dumpstack(NULL);
817         }
818 }
819
820 #define OSD_CHECK_SLOW_TH(oth, dev, expr)                               \
821 {                                                                       \
822         cfs_time_t __closed = cfs_time_current();                       \
823         cfs_time_t __alloced = oth->oth_alloced;                        \
824         cfs_time_t __started = oth->oth_started;                        \
825                                                                         \
826         expr;                                                           \
827         __osd_th_check_slow(oth, dev, __alloced, __started, __closed);  \
828 }
829
830 #else /* OSD_THANDLE_STATS */
831
832 #define osd_th_alloced(h)                  do {} while(0)
833 #define osd_th_started(h)                  do {} while(0)
834 #define OSD_CHECK_SLOW_TH(oth, dev, expr)  expr
835
836 #endif /* OSD_THANDLE_STATS */
837
838 /*
839  * Concurrency: doesn't access mutable data.
840  */
841 static int osd_param_is_not_sane(const struct osd_device *dev,
842                                  const struct thandle *th)
843 {
844         struct osd_thandle *oh = container_of(th, typeof(*oh), ot_super);
845
846         return oh->ot_credits > osd_journal(dev)->j_max_transaction_buffers;
847 }
848
849 /*
850  * Concurrency: shouldn't matter.
851  */
852 static void osd_trans_commit_cb(struct super_block *sb,
853                                 struct ldiskfs_journal_cb_entry *jcb, int error)
854 {
855         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
856         struct thandle     *th  = &oh->ot_super;
857         struct lu_device   *lud = &th->th_dev->dd_lu_dev;
858         struct dt_txn_commit_cb *dcb, *tmp;
859
860         LASSERT(oh->ot_handle == NULL);
861
862         if (error)
863                 CERROR("transaction @0x%p commit error: %d\n", th, error);
864
865         dt_txn_hook_commit(th);
866
867         /* call per-transaction callbacks if any */
868         cfs_list_for_each_entry_safe(dcb, tmp, &oh->ot_dcb_list, dcb_linkage) {
869                 LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
870                          "commit callback entry: magic=%x name='%s'\n",
871                          dcb->dcb_magic, dcb->dcb_name);
872                 cfs_list_del_init(&dcb->dcb_linkage);
873                 dcb->dcb_func(NULL, th, dcb, error);
874         }
875
876         lu_ref_del_at(&lud->ld_reference, &oh->ot_dev_link, "osd-tx", th);
877         lu_device_put(lud);
878         th->th_dev = NULL;
879
880         lu_context_exit(&th->th_ctx);
881         lu_context_fini(&th->th_ctx);
882         OBD_FREE_PTR(oh);
883 }
884
885 static struct thandle *osd_trans_create(const struct lu_env *env,
886                                         struct dt_device *d)
887 {
888         struct osd_thread_info *oti = osd_oti_get(env);
889         struct osd_iobuf       *iobuf = &oti->oti_iobuf;
890         struct osd_thandle     *oh;
891         struct thandle         *th;
892         ENTRY;
893
894         /* on pending IO in this thread should left from prev. request */
895         LASSERT(cfs_atomic_read(&iobuf->dr_numreqs) == 0);
896
897         th = ERR_PTR(-ENOMEM);
898         OBD_ALLOC_GFP(oh, sizeof *oh, GFP_NOFS);
899         if (oh != NULL) {
900                 oh->ot_quota_trans = &oti->oti_quota_trans;
901                 memset(oh->ot_quota_trans, 0, sizeof(*oh->ot_quota_trans));
902                 th = &oh->ot_super;
903                 th->th_dev = d;
904                 th->th_result = 0;
905                 th->th_tags = LCT_TX_HANDLE;
906                 oh->ot_credits = 0;
907                 oti->oti_dev = osd_dt_dev(d);
908                 CFS_INIT_LIST_HEAD(&oh->ot_dcb_list);
909                 osd_th_alloced(oh);
910
911                 memset(oti->oti_declare_ops, 0,
912                                         sizeof(oti->oti_declare_ops));
913                 memset(oti->oti_declare_ops_rb, 0,
914                                         sizeof(oti->oti_declare_ops_rb));
915                 memset(oti->oti_declare_ops_cred, 0,
916                                         sizeof(oti->oti_declare_ops_cred));
917                 oti->oti_rollback = false;
918         }
919         RETURN(th);
920 }
921
922 /*
923  * Concurrency: shouldn't matter.
924  */
925 int osd_trans_start(const struct lu_env *env, struct dt_device *d,
926                     struct thandle *th)
927 {
928         struct osd_thread_info *oti = osd_oti_get(env);
929         struct osd_device  *dev = osd_dt_dev(d);
930         handle_t           *jh;
931         struct osd_thandle *oh;
932         int rc;
933
934         ENTRY;
935
936         LASSERT(current->journal_info == NULL);
937
938         oh = container_of0(th, struct osd_thandle, ot_super);
939         LASSERT(oh != NULL);
940         LASSERT(oh->ot_handle == NULL);
941
942         rc = dt_txn_hook_start(env, d, th);
943         if (rc != 0)
944                 GOTO(out, rc);
945
946         if (unlikely(osd_param_is_not_sane(dev, th))) {
947                 static unsigned long last_printed;
948                 static int last_credits;
949
950                 CWARN("%.16s: too many transaction credits (%d > %d)\n",
951                       LDISKFS_SB(osd_sb(dev))->s_es->s_volume_name,
952                       oh->ot_credits,
953                       osd_journal(dev)->j_max_transaction_buffers);
954                 CWARN("  create: %u/%u, destroy: %u/%u\n",
955                       oti->oti_declare_ops[OSD_OT_CREATE],
956                       oti->oti_declare_ops_cred[OSD_OT_CREATE],
957                       oti->oti_declare_ops[OSD_OT_DESTROY],
958                       oti->oti_declare_ops_cred[OSD_OT_DESTROY]);
959                 CWARN("  attr_set: %u/%u, xattr_set: %u/%u\n",
960                       oti->oti_declare_ops[OSD_OT_ATTR_SET],
961                       oti->oti_declare_ops_cred[OSD_OT_ATTR_SET],
962                       oti->oti_declare_ops[OSD_OT_XATTR_SET],
963                       oti->oti_declare_ops_cred[OSD_OT_XATTR_SET]);
964                 CWARN("  write: %u/%u, punch: %u/%u, quota %u/%u\n",
965                       oti->oti_declare_ops[OSD_OT_WRITE],
966                       oti->oti_declare_ops_cred[OSD_OT_WRITE],
967                       oti->oti_declare_ops[OSD_OT_PUNCH],
968                       oti->oti_declare_ops_cred[OSD_OT_PUNCH],
969                       oti->oti_declare_ops[OSD_OT_QUOTA],
970                       oti->oti_declare_ops_cred[OSD_OT_QUOTA]);
971                 CWARN("  insert: %u/%u, delete: %u/%u\n",
972                       oti->oti_declare_ops[OSD_OT_INSERT],
973                       oti->oti_declare_ops_cred[OSD_OT_INSERT],
974                       oti->oti_declare_ops[OSD_OT_DELETE],
975                       oti->oti_declare_ops_cred[OSD_OT_DELETE]);
976                 CWARN("  ref_add: %u/%u, ref_del: %u/%u\n",
977                       oti->oti_declare_ops[OSD_OT_REF_ADD],
978                       oti->oti_declare_ops_cred[OSD_OT_REF_ADD],
979                       oti->oti_declare_ops[OSD_OT_REF_DEL],
980                       oti->oti_declare_ops_cred[OSD_OT_REF_DEL]);
981
982                 if (last_credits != oh->ot_credits &&
983                     time_after(jiffies, last_printed + 60 * HZ)) {
984                         libcfs_debug_dumpstack(NULL);
985                         last_credits = oh->ot_credits;
986                         last_printed = jiffies;
987                 }
988                 /* XXX Limit the credits to 'max_transaction_buffers', and
989                  *     let the underlying filesystem to catch the error if
990                  *     we really need so many credits.
991                  *
992                  *     This should be removed when we can calculate the
993                  *     credits precisely. */
994                 oh->ot_credits = osd_journal(dev)->j_max_transaction_buffers;
995         }
996
997         /*
998          * XXX temporary stuff. Some abstraction layer should
999          * be used.
1000          */
1001         jh = osd_journal_start_sb(osd_sb(dev), LDISKFS_HT_MISC, oh->ot_credits);
1002         osd_th_started(oh);
1003         if (!IS_ERR(jh)) {
1004                 oh->ot_handle = jh;
1005                 LASSERT(oti->oti_txns == 0);
1006                 lu_context_init(&th->th_ctx, th->th_tags);
1007                 lu_context_enter(&th->th_ctx);
1008
1009                 lu_device_get(&d->dd_lu_dev);
1010                 lu_ref_add_at(&d->dd_lu_dev.ld_reference, &oh->ot_dev_link,
1011                               "osd-tx", th);
1012                 oti->oti_txns++;
1013                 rc = 0;
1014         } else {
1015                 rc = PTR_ERR(jh);
1016         }
1017 out:
1018         RETURN(rc);
1019 }
1020
1021 /*
1022  * Concurrency: shouldn't matter.
1023  */
1024 static int osd_trans_stop(const struct lu_env *env, struct thandle *th)
1025 {
1026         int                     rc = 0;
1027         struct osd_thandle     *oh;
1028         struct osd_thread_info *oti = osd_oti_get(env);
1029         struct osd_iobuf       *iobuf = &oti->oti_iobuf;
1030         struct qsd_instance    *qsd = oti->oti_dev->od_quota_slave;
1031         struct lquota_trans    *qtrans;
1032         ENTRY;
1033
1034         oh = container_of0(th, struct osd_thandle, ot_super);
1035
1036         qtrans = oh->ot_quota_trans;
1037         oh->ot_quota_trans = NULL;
1038
1039         if (oh->ot_handle != NULL) {
1040                 handle_t *hdl = oh->ot_handle;
1041
1042                 /*
1043                  * add commit callback
1044                  * notice we don't do this in osd_trans_start()
1045                  * as underlying transaction can change during truncate
1046                  */
1047                 ldiskfs_journal_callback_add(hdl, osd_trans_commit_cb,
1048                                          &oh->ot_jcb);
1049
1050                 LASSERT(oti->oti_txns == 1);
1051                 oti->oti_txns--;
1052                 rc = dt_txn_hook_stop(env, th);
1053                 if (rc != 0)
1054                         CERROR("Failure in transaction hook: %d\n", rc);
1055
1056                 /* hook functions might modify th_sync */
1057                 hdl->h_sync = th->th_sync;
1058
1059                 oh->ot_handle = NULL;
1060                 OSD_CHECK_SLOW_TH(oh, oti->oti_dev,
1061                                   rc = ldiskfs_journal_stop(hdl));
1062                 if (rc != 0)
1063                         CERROR("Failure to stop transaction: %d\n", rc);
1064         } else {
1065                 OBD_FREE_PTR(oh);
1066         }
1067
1068         /* inform the quota slave device that the transaction is stopping */
1069         qsd_op_end(env, qsd, qtrans);
1070
1071         /* as we want IO to journal and data IO be concurrent, we don't block
1072          * awaiting data IO completion in osd_do_bio(), instead we wait here
1073          * once transaction is submitted to the journal. all reqular requests
1074          * don't do direct IO (except read/write), thus this wait_event becomes
1075          * no-op for them.
1076          *
1077          * IMPORTANT: we have to wait till any IO submited by the thread is
1078          * completed otherwise iobuf may be corrupted by different request
1079          */
1080         wait_event(iobuf->dr_wait,
1081                        cfs_atomic_read(&iobuf->dr_numreqs) == 0);
1082         osd_fini_iobuf(oti->oti_dev, iobuf);
1083         if (!rc)
1084                 rc = iobuf->dr_error;
1085
1086         RETURN(rc);
1087 }
1088
1089 static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
1090 {
1091         struct osd_thandle *oh = container_of0(th, struct osd_thandle,
1092                                                ot_super);
1093
1094         LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC);
1095         LASSERT(&dcb->dcb_func != NULL);
1096         cfs_list_add(&dcb->dcb_linkage, &oh->ot_dcb_list);
1097
1098         return 0;
1099 }
1100
1101 /*
1102  * Called just before object is freed. Releases all resources except for
1103  * object itself (that is released by osd_object_free()).
1104  *
1105  * Concurrency: no concurrent access is possible that late in object
1106  * life-cycle.
1107  */
1108 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
1109 {
1110         struct osd_object *obj   = osd_obj(l);
1111         struct inode      *inode = obj->oo_inode;
1112
1113         LINVRNT(osd_invariant(obj));
1114
1115         /*
1116          * If object is unlinked remove fid->ino mapping from object index.
1117          */
1118
1119         osd_index_fini(obj);
1120         if (inode != NULL) {
1121                 struct qsd_instance     *qsd = osd_obj2dev(obj)->od_quota_slave;
1122                 qid_t                    uid = inode->i_uid;
1123                 qid_t                    gid = inode->i_gid;
1124
1125                 iput(inode);
1126                 obj->oo_inode = NULL;
1127
1128                 if (qsd != NULL) {
1129                         struct osd_thread_info  *info = osd_oti_get(env);
1130                         struct lquota_id_info   *qi = &info->oti_qi;
1131
1132                         /* Release granted quota to master if necessary */
1133                         qi->lqi_id.qid_uid = uid;
1134                         qsd_op_adjust(env, qsd, &qi->lqi_id, USRQUOTA);
1135
1136                         qi->lqi_id.qid_uid = gid;
1137                         qsd_op_adjust(env, qsd, &qi->lqi_id, GRPQUOTA);
1138                 }
1139         }
1140 }
1141
1142 /*
1143  * Concurrency: ->loo_object_release() is called under site spin-lock.
1144  */
1145 static void osd_object_release(const struct lu_env *env,
1146                                struct lu_object *l)
1147 {
1148 }
1149
1150 /*
1151  * Concurrency: shouldn't matter.
1152  */
1153 static int osd_object_print(const struct lu_env *env, void *cookie,
1154                             lu_printer_t p, const struct lu_object *l)
1155 {
1156         struct osd_object *o = osd_obj(l);
1157         struct iam_descr  *d;
1158
1159         if (o->oo_dir != NULL)
1160                 d = o->oo_dir->od_container.ic_descr;
1161         else
1162                 d = NULL;
1163         return (*p)(env, cookie,
1164                     LUSTRE_OSD_LDISKFS_NAME"-object@%p(i:%p:%lu/%u)[%s]",
1165                     o, o->oo_inode,
1166                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
1167                     o->oo_inode ? o->oo_inode->i_generation : 0,
1168                     d ? d->id_ops->id_name : "plain");
1169 }
1170
1171 /*
1172  * Concurrency: shouldn't matter.
1173  */
1174 int osd_statfs(const struct lu_env *env, struct dt_device *d,
1175                struct obd_statfs *sfs)
1176 {
1177         struct osd_device  *osd = osd_dt_dev(d);
1178         struct super_block *sb = osd_sb(osd);
1179         struct kstatfs     *ksfs;
1180         int result = 0;
1181
1182         if (unlikely(osd->od_mnt == NULL))
1183                 return -EINPROGRESS;
1184
1185         /* osd_lproc.c call this without env, allocate ksfs for that case */
1186         if (unlikely(env == NULL)) {
1187                 OBD_ALLOC_PTR(ksfs);
1188                 if (ksfs == NULL)
1189                         return -ENOMEM;
1190         } else {
1191                 ksfs = &osd_oti_get(env)->oti_ksfs;
1192         }
1193
1194         spin_lock(&osd->od_osfs_lock);
1195         /* cache 1 second */
1196         if (cfs_time_before_64(osd->od_osfs_age, cfs_time_shift_64(-1))) {
1197                 result = sb->s_op->statfs(sb->s_root, ksfs);
1198                 if (likely(result == 0)) { /* N.B. statfs can't really fail */
1199                         osd->od_osfs_age = cfs_time_current_64();
1200                         statfs_pack(&osd->od_statfs, ksfs);
1201                         if (sb->s_flags & MS_RDONLY)
1202                                 osd->od_statfs.os_state = OS_STATE_READONLY;
1203                 }
1204         }
1205
1206         if (likely(result == 0))
1207                 *sfs = osd->od_statfs;
1208         spin_unlock(&osd->od_osfs_lock);
1209
1210         if (unlikely(env == NULL))
1211                 OBD_FREE_PTR(ksfs);
1212
1213         return result;
1214 }
1215
1216 /**
1217  * Estimate space needed for file creations. We assume the largest filename
1218  * which is 2^64 - 1, hence a filename of 20 chars.
1219  * This is 28 bytes per object which is 28MB for 1M objects ... no so bad.
1220  */
1221 #ifdef __LDISKFS_DIR_REC_LEN
1222 #define PER_OBJ_USAGE __LDISKFS_DIR_REC_LEN(20)
1223 #else
1224 #define PER_OBJ_USAGE LDISKFS_DIR_REC_LEN(20)
1225 #endif
1226
1227 /*
1228  * Concurrency: doesn't access mutable data.
1229  */
1230 static void osd_conf_get(const struct lu_env *env,
1231                          const struct dt_device *dev,
1232                          struct dt_device_param *param)
1233 {
1234         struct super_block *sb = osd_sb(osd_dt_dev(dev));
1235         int                ea_overhead;
1236
1237         /*
1238          * XXX should be taken from not-yet-existing fs abstraction layer.
1239          */
1240         param->ddp_max_name_len = LDISKFS_NAME_LEN;
1241         param->ddp_max_nlink    = LDISKFS_LINK_MAX;
1242         param->ddp_block_shift  = sb->s_blocksize_bits;
1243         param->ddp_mount_type     = LDD_MT_LDISKFS;
1244         param->ddp_maxbytes       = sb->s_maxbytes;
1245         /* Overhead estimate should be fairly accurate, so we really take a tiny
1246          * error margin which also avoids fragmenting the filesystem too much */
1247         param->ddp_grant_reserved = 2; /* end up to be 1.9% after conversion */
1248         /* inode are statically allocated, so per-inode space consumption
1249          * is the space consumed by the directory entry */
1250         param->ddp_inodespace     = PER_OBJ_USAGE;
1251         /* per-fragment overhead to be used by the client code */
1252         param->ddp_grant_frag     = 6 * LDISKFS_BLOCK_SIZE(sb);
1253         param->ddp_mntopts      = 0;
1254         if (test_opt(sb, XATTR_USER))
1255                 param->ddp_mntopts |= MNTOPT_USERXATTR;
1256         if (test_opt(sb, POSIX_ACL))
1257                 param->ddp_mntopts |= MNTOPT_ACL;
1258
1259         /* LOD might calculate the max stripe count based on max_ea_size,
1260          * so we need take account in the overhead as well,
1261          * xattr_header + magic + xattr_entry_head */
1262         ea_overhead = sizeof(struct ldiskfs_xattr_header) + sizeof(__u32) +
1263                       LDISKFS_XATTR_LEN(XATTR_NAME_MAX_LEN);
1264
1265 #if defined(LDISKFS_FEATURE_INCOMPAT_EA_INODE)
1266         if (LDISKFS_HAS_INCOMPAT_FEATURE(sb, LDISKFS_FEATURE_INCOMPAT_EA_INODE))
1267                 param->ddp_max_ea_size = LDISKFS_XATTR_MAX_LARGE_EA_SIZE -
1268                                                                 ea_overhead;
1269         else
1270 #endif
1271                 param->ddp_max_ea_size = sb->s_blocksize - ea_overhead;
1272 }
1273
1274 /*
1275  * Concurrency: shouldn't matter.
1276  */
1277 static int osd_sync(const struct lu_env *env, struct dt_device *d)
1278 {
1279         CDEBUG(D_HA, "syncing OSD %s\n", LUSTRE_OSD_LDISKFS_NAME);
1280         return ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
1281 }
1282
1283 /**
1284  * Start commit for OSD device.
1285  *
1286  * An implementation of dt_commit_async method for OSD device.
1287  * Asychronously starts underlayng fs sync and thereby a transaction
1288  * commit.
1289  *
1290  * \param env environment
1291  * \param d dt device
1292  *
1293  * \see dt_device_operations
1294  */
1295 static int osd_commit_async(const struct lu_env *env,
1296                             struct dt_device *d)
1297 {
1298         struct super_block *s = osd_sb(osd_dt_dev(d));
1299         ENTRY;
1300
1301         CDEBUG(D_HA, "async commit OSD %s\n", LUSTRE_OSD_LDISKFS_NAME);
1302         RETURN(s->s_op->sync_fs(s, 0));
1303 }
1304
1305 /*
1306  * Concurrency: shouldn't matter.
1307  */
1308
1309 static int osd_ro(const struct lu_env *env, struct dt_device *d)
1310 {
1311         struct super_block *sb = osd_sb(osd_dt_dev(d));
1312         struct block_device *dev = sb->s_bdev;
1313 #ifdef HAVE_DEV_SET_RDONLY
1314         struct block_device *jdev = LDISKFS_SB(sb)->journal_bdev;
1315         int rc = 0;
1316 #else
1317         int rc = -EOPNOTSUPP;
1318 #endif
1319         ENTRY;
1320
1321 #ifdef HAVE_DEV_SET_RDONLY
1322         CERROR("*** setting %s read-only ***\n", osd_dt_dev(d)->od_svname);
1323
1324         if (jdev && (jdev != dev)) {
1325                 CDEBUG(D_IOCTL | D_HA, "set journal dev %lx rdonly\n",
1326                        (long)jdev);
1327                 dev_set_rdonly(jdev);
1328         }
1329         CDEBUG(D_IOCTL | D_HA, "set dev %lx rdonly\n", (long)dev);
1330         dev_set_rdonly(dev);
1331 #else
1332         CERROR("%s: %lx CANNOT BE SET READONLY: rc = %d\n",
1333                osd_dt_dev(d)->od_svname, (long)dev, rc);
1334 #endif
1335         RETURN(rc);
1336 }
1337
1338 /*
1339  * Concurrency: serialization provided by callers.
1340  */
1341 static int osd_init_capa_ctxt(const struct lu_env *env, struct dt_device *d,
1342                               int mode, unsigned long timeout, __u32 alg,
1343                               struct lustre_capa_key *keys)
1344 {
1345         struct osd_device *dev = osd_dt_dev(d);
1346         ENTRY;
1347
1348         dev->od_fl_capa = mode;
1349         dev->od_capa_timeout = timeout;
1350         dev->od_capa_alg = alg;
1351         dev->od_capa_keys = keys;
1352         RETURN(0);
1353 }
1354
1355 /**
1356  * Note: we do not count into QUOTA here.
1357  * If we mount with --data_journal we may need more.
1358  */
1359 const int osd_dto_credits_noquota[DTO_NR] = {
1360         /**
1361          * Insert.
1362          * INDEX_EXTRA_TRANS_BLOCKS(8) +
1363          * SINGLEDATA_TRANS_BLOCKS(8)
1364          * XXX Note: maybe iam need more, since iam have more level than
1365          *           EXT3 htree.
1366          */
1367         [DTO_INDEX_INSERT]  = 16,
1368         /**
1369          * Delete
1370          * just modify a single entry, probably merge few within a block
1371          */
1372         [DTO_INDEX_DELETE]  = 1,
1373         /**
1374          * Used for OI scrub
1375          */
1376         [DTO_INDEX_UPDATE]  = 16,
1377         /**
1378          * 4(inode, inode bits, groups, GDT)
1379          *   notice: OI updates are counted separately with DTO_INDEX_INSERT
1380          */
1381         [DTO_OBJECT_CREATE] = 4,
1382         /**
1383          * 4(inode, inode bits, groups, GDT)
1384          *   notice: OI updates are counted separately with DTO_INDEX_DELETE
1385          */
1386         [DTO_OBJECT_DELETE] = 4,
1387         /**
1388          * Attr set credits (inode)
1389          */
1390         [DTO_ATTR_SET_BASE] = 1,
1391         /**
1392          * Xattr set. The same as xattr of EXT3.
1393          * DATA_TRANS_BLOCKS(14)
1394          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS
1395          * are also counted in. Do not know why?
1396          */
1397         [DTO_XATTR_SET]     = 14,
1398         /**
1399          * credits for inode change during write.
1400          */
1401         [DTO_WRITE_BASE]    = 3,
1402         /**
1403          * credits for single block write.
1404          */
1405         [DTO_WRITE_BLOCK]   = 14,
1406         /**
1407          * Attr set credits for chown.
1408          * This is extra credits for setattr, and it is null without quota
1409          */
1410         [DTO_ATTR_SET_CHOWN] = 0
1411 };
1412
1413 static const struct dt_device_operations osd_dt_ops = {
1414         .dt_root_get       = osd_root_get,
1415         .dt_statfs         = osd_statfs,
1416         .dt_trans_create   = osd_trans_create,
1417         .dt_trans_start    = osd_trans_start,
1418         .dt_trans_stop     = osd_trans_stop,
1419         .dt_trans_cb_add   = osd_trans_cb_add,
1420         .dt_conf_get       = osd_conf_get,
1421         .dt_sync           = osd_sync,
1422         .dt_ro             = osd_ro,
1423         .dt_commit_async   = osd_commit_async,
1424         .dt_init_capa_ctxt = osd_init_capa_ctxt,
1425 };
1426
1427 static void osd_object_read_lock(const struct lu_env *env,
1428                                  struct dt_object *dt, unsigned role)
1429 {
1430         struct osd_object *obj = osd_dt_obj(dt);
1431         struct osd_thread_info *oti = osd_oti_get(env);
1432
1433         LINVRNT(osd_invariant(obj));
1434
1435         LASSERT(obj->oo_owner != env);
1436         down_read_nested(&obj->oo_sem, role);
1437
1438         LASSERT(obj->oo_owner == NULL);
1439         oti->oti_r_locks++;
1440 }
1441
1442 static void osd_object_write_lock(const struct lu_env *env,
1443                                   struct dt_object *dt, unsigned role)
1444 {
1445         struct osd_object *obj = osd_dt_obj(dt);
1446         struct osd_thread_info *oti = osd_oti_get(env);
1447
1448         LINVRNT(osd_invariant(obj));
1449
1450         LASSERT(obj->oo_owner != env);
1451         down_write_nested(&obj->oo_sem, role);
1452
1453         LASSERT(obj->oo_owner == NULL);
1454         obj->oo_owner = env;
1455         oti->oti_w_locks++;
1456 }
1457
1458 static void osd_object_read_unlock(const struct lu_env *env,
1459                                    struct dt_object *dt)
1460 {
1461         struct osd_object *obj = osd_dt_obj(dt);
1462         struct osd_thread_info *oti = osd_oti_get(env);
1463
1464         LINVRNT(osd_invariant(obj));
1465
1466         LASSERT(oti->oti_r_locks > 0);
1467         oti->oti_r_locks--;
1468         up_read(&obj->oo_sem);
1469 }
1470
1471 static void osd_object_write_unlock(const struct lu_env *env,
1472                                     struct dt_object *dt)
1473 {
1474         struct osd_object *obj = osd_dt_obj(dt);
1475         struct osd_thread_info *oti = osd_oti_get(env);
1476
1477         LINVRNT(osd_invariant(obj));
1478
1479         LASSERT(obj->oo_owner == env);
1480         LASSERT(oti->oti_w_locks > 0);
1481         oti->oti_w_locks--;
1482         obj->oo_owner = NULL;
1483         up_write(&obj->oo_sem);
1484 }
1485
1486 static int osd_object_write_locked(const struct lu_env *env,
1487                                    struct dt_object *dt)
1488 {
1489         struct osd_object *obj = osd_dt_obj(dt);
1490
1491         LINVRNT(osd_invariant(obj));
1492
1493         return obj->oo_owner == env;
1494 }
1495
1496 static int capa_is_sane(const struct lu_env *env,
1497                         struct osd_device *dev,
1498                         struct lustre_capa *capa,
1499                         struct lustre_capa_key *keys)
1500 {
1501         struct osd_thread_info *oti = osd_oti_get(env);
1502         struct lustre_capa *tcapa = &oti->oti_capa;
1503         struct obd_capa *oc;
1504         int i, rc = 0;
1505         ENTRY;
1506
1507         oc = capa_lookup(dev->od_capa_hash, capa, 0);
1508         if (oc) {
1509                 if (capa_is_expired(oc)) {
1510                         DEBUG_CAPA(D_ERROR, capa, "expired");
1511                         rc = -ESTALE;
1512                 }
1513                 capa_put(oc);
1514                 RETURN(rc);
1515         }
1516
1517         if (capa_is_expired_sec(capa)) {
1518                 DEBUG_CAPA(D_ERROR, capa, "expired");
1519                 RETURN(-ESTALE);
1520         }
1521
1522         spin_lock(&capa_lock);
1523         for (i = 0; i < 2; i++) {
1524                 if (keys[i].lk_keyid == capa->lc_keyid) {
1525                         oti->oti_capa_key = keys[i];
1526                         break;
1527                 }
1528         }
1529         spin_unlock(&capa_lock);
1530
1531         if (i == 2) {
1532                 DEBUG_CAPA(D_ERROR, capa, "no matched capa key");
1533                 RETURN(-ESTALE);
1534         }
1535
1536         rc = capa_hmac(tcapa->lc_hmac, capa, oti->oti_capa_key.lk_key);
1537         if (rc)
1538                 RETURN(rc);
1539
1540         if (memcmp(tcapa->lc_hmac, capa->lc_hmac, sizeof(capa->lc_hmac))) {
1541                 DEBUG_CAPA(D_ERROR, capa, "HMAC mismatch");
1542                 RETURN(-EACCES);
1543         }
1544
1545         oc = capa_add(dev->od_capa_hash, capa);
1546         capa_put(oc);
1547
1548         RETURN(0);
1549 }
1550
1551 int osd_object_auth(const struct lu_env *env, struct dt_object *dt,
1552                     struct lustre_capa *capa, __u64 opc)
1553 {
1554         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
1555         struct osd_device *osd = osd_dev(dt->do_lu.lo_dev);
1556         struct lu_capainfo *lci;
1557         int rc;
1558
1559         if (!osd->od_fl_capa)
1560                 return 0;
1561
1562         if (capa == BYPASS_CAPA)
1563                 return 0;
1564
1565         lci = lu_capainfo_get(env);
1566         if (unlikely(lci == NULL))
1567                 return 0;
1568
1569         if (lci->lci_auth == LC_ID_NONE)
1570                 return 0;
1571
1572         if (capa == NULL) {
1573                 CERROR("%s: no capability provided for FID "DFID": rc = %d\n",
1574                        osd_name(osd), PFID(fid), -EACCES);
1575                 return -EACCES;
1576         }
1577
1578         if (!lu_fid_eq(fid, &capa->lc_fid)) {
1579                 DEBUG_CAPA(D_ERROR, capa, "fid "DFID" mismatch with",
1580                            PFID(fid));
1581                 return -EACCES;
1582         }
1583
1584         if (!capa_opc_supported(capa, opc)) {
1585                 DEBUG_CAPA(D_ERROR, capa, "opc "LPX64" not supported by", opc);
1586                 return -EACCES;
1587         }
1588
1589         rc = capa_is_sane(env, osd, capa, osd->od_capa_keys);
1590         if (rc != 0) {
1591                 DEBUG_CAPA(D_ERROR, capa, "insane: rc = %d", rc);
1592                 return -EACCES;
1593         }
1594
1595         return 0;
1596 }
1597
1598 static struct timespec *osd_inode_time(const struct lu_env *env,
1599                                        struct inode *inode, __u64 seconds)
1600 {
1601         struct osd_thread_info  *oti = osd_oti_get(env);
1602         struct timespec         *t   = &oti->oti_time;
1603
1604         t->tv_sec = seconds;
1605         t->tv_nsec = 0;
1606         *t = timespec_trunc(*t, inode->i_sb->s_time_gran);
1607         return t;
1608 }
1609
1610
1611 static void osd_inode_getattr(const struct lu_env *env,
1612                               struct inode *inode, struct lu_attr *attr)
1613 {
1614         attr->la_valid      |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
1615                                LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
1616                                LA_FLAGS | LA_NLINK | LA_RDEV | LA_BLKSIZE |
1617                                LA_TYPE;
1618
1619         attr->la_atime      = LTIME_S(inode->i_atime);
1620         attr->la_mtime      = LTIME_S(inode->i_mtime);
1621         attr->la_ctime      = LTIME_S(inode->i_ctime);
1622         attr->la_mode       = inode->i_mode;
1623         attr->la_size       = i_size_read(inode);
1624         attr->la_blocks     = inode->i_blocks;
1625         attr->la_uid        = inode->i_uid;
1626         attr->la_gid        = inode->i_gid;
1627         attr->la_flags      = LDISKFS_I(inode)->i_flags;
1628         attr->la_nlink      = inode->i_nlink;
1629         attr->la_rdev       = inode->i_rdev;
1630         attr->la_blksize    = 1 << inode->i_blkbits;
1631         attr->la_blkbits    = inode->i_blkbits;
1632 }
1633
1634 static int osd_attr_get(const struct lu_env *env,
1635                         struct dt_object *dt,
1636                         struct lu_attr *attr,
1637                         struct lustre_capa *capa)
1638 {
1639         struct osd_object *obj = osd_dt_obj(dt);
1640
1641         LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
1642         LINVRNT(osd_invariant(obj));
1643
1644         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
1645                 return -EACCES;
1646
1647         spin_lock(&obj->oo_guard);
1648         osd_inode_getattr(env, obj->oo_inode, attr);
1649         spin_unlock(&obj->oo_guard);
1650         return 0;
1651 }
1652
1653 static int osd_declare_attr_set(const struct lu_env *env,
1654                                 struct dt_object *dt,
1655                                 const struct lu_attr *attr,
1656                                 struct thandle *handle)
1657 {
1658         struct osd_thandle     *oh;
1659         struct osd_object      *obj;
1660         struct osd_thread_info *info = osd_oti_get(env);
1661         struct lquota_id_info  *qi = &info->oti_qi;
1662         long long               bspace;
1663         int                     rc = 0;
1664         bool                    enforce;
1665         ENTRY;
1666
1667         LASSERT(dt != NULL);
1668         LASSERT(handle != NULL);
1669
1670         obj = osd_dt_obj(dt);
1671         LASSERT(osd_invariant(obj));
1672
1673         oh = container_of0(handle, struct osd_thandle, ot_super);
1674         LASSERT(oh->ot_handle == NULL);
1675
1676         osd_trans_declare_op(env, oh, OSD_OT_ATTR_SET,
1677                              osd_dto_credits_noquota[DTO_ATTR_SET_BASE]);
1678
1679         if (attr == NULL || obj->oo_inode == NULL)
1680                 RETURN(rc);
1681
1682         bspace   = obj->oo_inode->i_blocks;
1683         bspace <<= obj->oo_inode->i_sb->s_blocksize_bits;
1684         bspace   = toqb(bspace);
1685
1686         /* Changing ownership is always preformed by super user, it should not
1687          * fail with EDQUOT.
1688          *
1689          * We still need to call the osd_declare_qid() to calculate the journal
1690          * credits for updating quota accounting files and to trigger quota
1691          * space adjustment once the operation is completed.*/
1692         if (attr->la_valid & LA_UID || attr->la_valid & LA_GID) {
1693                 /* USERQUOTA */
1694                 qi->lqi_type = USRQUOTA;
1695                 enforce = (attr->la_valid & LA_UID) &&
1696                           (attr->la_uid != obj->oo_inode->i_uid);
1697                 /* inode accounting */
1698                 qi->lqi_is_blk = false;
1699
1700                 /* one more inode for the new uid ... */
1701                 qi->lqi_id.qid_uid = attr->la_uid;
1702                 qi->lqi_space      = 1;
1703                 /* Reserve credits for the new uid */
1704                 rc = osd_declare_qid(env, oh, qi, NULL, enforce, NULL);
1705                 if (rc == -EDQUOT || rc == -EINPROGRESS)
1706                         rc = 0;
1707                 if (rc)
1708                         RETURN(rc);
1709
1710                 /* and one less inode for the current uid */
1711                 qi->lqi_id.qid_uid = obj->oo_inode->i_uid;
1712                 qi->lqi_space      = -1;
1713                 rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
1714                 if (rc == -EDQUOT || rc == -EINPROGRESS)
1715                         rc = 0;
1716                 if (rc)
1717                         RETURN(rc);
1718
1719                 /* block accounting */
1720                 qi->lqi_is_blk = true;
1721
1722                 /* more blocks for the new uid ... */
1723                 qi->lqi_id.qid_uid = attr->la_uid;
1724                 qi->lqi_space      = bspace;
1725                 /*
1726                  * Credits for the new uid has been reserved, re-use "obj"
1727                  * to save credit reservation.
1728                  */
1729                 rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
1730                 if (rc == -EDQUOT || rc == -EINPROGRESS)
1731                         rc = 0;
1732                 if (rc)
1733                         RETURN(rc);
1734
1735                 /* and finally less blocks for the current uid */
1736                 qi->lqi_id.qid_uid = obj->oo_inode->i_uid;
1737                 qi->lqi_space      = -bspace;
1738                 rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
1739                 if (rc == -EDQUOT || rc == -EINPROGRESS)
1740                         rc = 0;
1741                 if (rc)
1742                         RETURN(rc);
1743
1744                 /* GROUP QUOTA */
1745                 qi->lqi_type = GRPQUOTA;
1746                 enforce = (attr->la_valid & LA_GID) &&
1747                           (attr->la_gid != obj->oo_inode->i_gid);
1748
1749                 /* inode accounting */
1750                 qi->lqi_is_blk = false;
1751
1752                 /* one more inode for the new gid ... */
1753                 qi->lqi_id.qid_gid = attr->la_gid;
1754                 qi->lqi_space      = 1;
1755                 rc = osd_declare_qid(env, oh, qi, NULL, enforce, NULL);
1756                 if (rc == -EDQUOT || rc == -EINPROGRESS)
1757                         rc = 0;
1758                 if (rc)
1759                         RETURN(rc);
1760
1761                 /* and one less inode for the current gid */
1762                 qi->lqi_id.qid_gid = obj->oo_inode->i_gid;
1763                 qi->lqi_space      = -1;
1764                 rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
1765                 if (rc == -EDQUOT || rc == -EINPROGRESS)
1766                         rc = 0;
1767                 if (rc)
1768                         RETURN(rc);
1769
1770                 /* block accounting */
1771                 qi->lqi_is_blk = true;
1772
1773                 /* more blocks for the new gid ... */
1774                 qi->lqi_id.qid_gid = attr->la_gid;
1775                 qi->lqi_space      = bspace;
1776                 rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
1777                 if (rc == -EDQUOT || rc == -EINPROGRESS)
1778                         rc = 0;
1779                 if (rc)
1780                         RETURN(rc);
1781
1782                 /* and finally less blocks for the current gid */
1783                 qi->lqi_id.qid_gid = obj->oo_inode->i_gid;
1784                 qi->lqi_space      = -bspace;
1785                 rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
1786                 if (rc == -EDQUOT || rc == -EINPROGRESS)
1787                         rc = 0;
1788                 if (rc)
1789                         RETURN(rc);
1790         }
1791
1792         RETURN(rc);
1793 }
1794
1795 static int osd_inode_setattr(const struct lu_env *env,
1796                              struct inode *inode, const struct lu_attr *attr)
1797 {
1798         __u64 bits;
1799
1800         bits = attr->la_valid;
1801
1802         if (bits & LA_ATIME)
1803                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
1804         if (bits & LA_CTIME)
1805                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
1806         if (bits & LA_MTIME)
1807                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
1808         if (bits & LA_SIZE) {
1809                 LDISKFS_I(inode)->i_disksize = attr->la_size;
1810                 i_size_write(inode, attr->la_size);
1811         }
1812
1813 #if 0
1814         /* OSD should not change "i_blocks" which is used by quota.
1815          * "i_blocks" should be changed by ldiskfs only. */
1816         if (bits & LA_BLOCKS)
1817                 inode->i_blocks = attr->la_blocks;
1818 #endif
1819         if (bits & LA_MODE)
1820                 inode->i_mode   = (inode->i_mode & S_IFMT) |
1821                         (attr->la_mode & ~S_IFMT);
1822         if (bits & LA_UID)
1823                 inode->i_uid    = attr->la_uid;
1824         if (bits & LA_GID)
1825                 inode->i_gid    = attr->la_gid;
1826         if (bits & LA_NLINK)
1827                 set_nlink(inode, attr->la_nlink);
1828         if (bits & LA_RDEV)
1829                 inode->i_rdev   = attr->la_rdev;
1830
1831         if (bits & LA_FLAGS) {
1832                 /* always keep S_NOCMTIME */
1833                 inode->i_flags = ll_ext_to_inode_flags(attr->la_flags) |
1834                                  S_NOCMTIME;
1835         }
1836         return 0;
1837 }
1838
1839 static int osd_quota_transfer(struct inode *inode, const struct lu_attr *attr)
1840 {
1841         if ((attr->la_valid & LA_UID && attr->la_uid != inode->i_uid) ||
1842             (attr->la_valid & LA_GID && attr->la_gid != inode->i_gid)) {
1843                 struct iattr    iattr;
1844                 int             rc;
1845
1846                 ll_vfs_dq_init(inode);
1847                 iattr.ia_valid = 0;
1848                 if (attr->la_valid & LA_UID)
1849                         iattr.ia_valid |= ATTR_UID;
1850                 if (attr->la_valid & LA_GID)
1851                         iattr.ia_valid |= ATTR_GID;
1852                 iattr.ia_uid = attr->la_uid;
1853                 iattr.ia_gid = attr->la_gid;
1854
1855                 rc = ll_vfs_dq_transfer(inode, &iattr);
1856                 if (rc) {
1857                         CERROR("%s: quota transfer failed: rc = %d. Is quota "
1858                                "enforcement enabled on the ldiskfs filesystem?",
1859                                inode->i_sb->s_id, rc);
1860                         return rc;
1861                 }
1862         }
1863         return 0;
1864 }
1865
1866 static int osd_attr_set(const struct lu_env *env,
1867                         struct dt_object *dt,
1868                         const struct lu_attr *attr,
1869                         struct thandle *handle,
1870                         struct lustre_capa *capa)
1871 {
1872         struct osd_object *obj = osd_dt_obj(dt);
1873         struct inode      *inode;
1874         int rc;
1875
1876         LASSERT(handle != NULL);
1877         LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
1878         LASSERT(osd_invariant(obj));
1879
1880         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
1881                 return -EACCES;
1882
1883         osd_trans_exec_op(env, handle, OSD_OT_ATTR_SET);
1884
1885         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_FID_MAPPING)) {
1886                 struct osd_thread_info  *oti  = osd_oti_get(env);
1887                 const struct lu_fid     *fid0 = lu_object_fid(&dt->do_lu);
1888                 struct lu_fid           *fid1 = &oti->oti_fid;
1889                 struct osd_inode_id     *id   = &oti->oti_id;
1890                 struct iam_path_descr   *ipd;
1891                 struct iam_container    *bag;
1892                 struct osd_thandle      *oh;
1893                 int                      rc;
1894
1895                 fid_cpu_to_be(fid1, fid0);
1896                 memset(id, 1, sizeof(*id));
1897                 bag = &osd_fid2oi(osd_dev(dt->do_lu.lo_dev),
1898                                   fid0)->oi_dir.od_container;
1899                 ipd = osd_idx_ipd_get(env, bag);
1900                 if (unlikely(ipd == NULL))
1901                         RETURN(-ENOMEM);
1902
1903                 oh = container_of0(handle, struct osd_thandle, ot_super);
1904                 rc = iam_update(oh->ot_handle, bag, (const struct iam_key *)fid1,
1905                                 (const struct iam_rec *)id, ipd);
1906                 osd_ipd_put(env, bag, ipd);
1907                 return(rc > 0 ? 0 : rc);
1908         }
1909
1910         inode = obj->oo_inode;
1911
1912         rc = osd_quota_transfer(inode, attr);
1913         if (rc)
1914                 return rc;
1915
1916         spin_lock(&obj->oo_guard);
1917         rc = osd_inode_setattr(env, inode, attr);
1918         spin_unlock(&obj->oo_guard);
1919
1920         if (!rc)
1921                 ll_dirty_inode(inode, I_DIRTY_DATASYNC);
1922         return rc;
1923 }
1924
1925 struct dentry *osd_child_dentry_get(const struct lu_env *env,
1926                                     struct osd_object *obj,
1927                                     const char *name, const int namelen)
1928 {
1929         return osd_child_dentry_by_inode(env, obj->oo_inode, name, namelen);
1930 }
1931
1932 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
1933                       umode_t mode, struct dt_allocation_hint *hint,
1934                       struct thandle *th)
1935 {
1936         int result;
1937         struct osd_device  *osd = osd_obj2dev(obj);
1938         struct osd_thandle *oth;
1939         struct dt_object   *parent = NULL;
1940         struct inode       *inode;
1941
1942         LINVRNT(osd_invariant(obj));
1943         LASSERT(obj->oo_inode == NULL);
1944         LASSERT(obj->oo_hl_head == NULL);
1945
1946         if (S_ISDIR(mode) && ldiskfs_pdo) {
1947                 obj->oo_hl_head =ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
1948                 if (obj->oo_hl_head == NULL)
1949                         return -ENOMEM;
1950         }
1951
1952         oth = container_of(th, struct osd_thandle, ot_super);
1953         LASSERT(oth->ot_handle->h_transaction != NULL);
1954
1955         if (hint && hint->dah_parent)
1956                 parent = hint->dah_parent;
1957
1958         inode = ldiskfs_create_inode(oth->ot_handle,
1959                                      parent ? osd_dt_obj(parent)->oo_inode :
1960                                               osd_sb(osd)->s_root->d_inode,
1961                                      mode);
1962         if (!IS_ERR(inode)) {
1963                 /* Do not update file c/mtime in ldiskfs.
1964                  * NB: don't need any lock because no contention at this
1965                  * early stage */
1966                 inode->i_flags |= S_NOCMTIME;
1967
1968                 /* For new created object, it must be consistent,
1969                  * and it is unnecessary to scrub against it. */
1970                 ldiskfs_set_inode_state(inode, LDISKFS_STATE_LUSTRE_NOSCRUB);
1971                 ldiskfs_clear_inode_state(inode, LDISKFS_STATE_LUSTRE_NO_OI);
1972                 obj->oo_inode = inode;
1973                 result = 0;
1974         } else {
1975                 if (obj->oo_hl_head != NULL) {
1976                         ldiskfs_htree_lock_head_free(obj->oo_hl_head);
1977                         obj->oo_hl_head = NULL;
1978                 }
1979                 result = PTR_ERR(inode);
1980         }
1981         LINVRNT(osd_invariant(obj));
1982         return result;
1983 }
1984
1985 enum {
1986         OSD_NAME_LEN = 255
1987 };
1988
1989 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
1990                      struct lu_attr *attr,
1991                      struct dt_allocation_hint *hint,
1992                      struct dt_object_format *dof,
1993                      struct thandle *th)
1994 {
1995         int result;
1996         struct osd_thandle *oth;
1997         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX));
1998
1999         LASSERT(S_ISDIR(attr->la_mode));
2000
2001         oth = container_of(th, struct osd_thandle, ot_super);
2002         LASSERT(oth->ot_handle->h_transaction != NULL);
2003         result = osd_mkfile(info, obj, mode, hint, th);
2004
2005         return result;
2006 }
2007
2008 static int osd_mk_index(struct osd_thread_info *info, struct osd_object *obj,
2009                         struct lu_attr *attr,
2010                         struct dt_allocation_hint *hint,
2011                         struct dt_object_format *dof,
2012                         struct thandle *th)
2013 {
2014         int result;
2015         struct osd_thandle *oth;
2016         const struct dt_index_features *feat = dof->u.dof_idx.di_feat;
2017
2018         __u32 mode = (attr->la_mode & (S_IFMT | S_IALLUGO | S_ISVTX));
2019
2020         LASSERT(S_ISREG(attr->la_mode));
2021
2022         oth = container_of(th, struct osd_thandle, ot_super);
2023         LASSERT(oth->ot_handle->h_transaction != NULL);
2024
2025         result = osd_mkfile(info, obj, mode, hint, th);
2026         if (result == 0) {
2027                 LASSERT(obj->oo_inode != NULL);
2028                 if (feat->dif_flags & DT_IND_VARKEY)
2029                         result = iam_lvar_create(obj->oo_inode,
2030                                                  feat->dif_keysize_max,
2031                                                  feat->dif_ptrsize,
2032                                                  feat->dif_recsize_max,
2033                                                  oth->ot_handle);
2034                 else
2035                         result = iam_lfix_create(obj->oo_inode,
2036                                                  feat->dif_keysize_max,
2037                                                  feat->dif_ptrsize,
2038                                                  feat->dif_recsize_max,
2039                                                  oth->ot_handle);
2040
2041         }
2042         return result;
2043 }
2044
2045 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
2046                      struct lu_attr *attr,
2047                      struct dt_allocation_hint *hint,
2048                      struct dt_object_format *dof,
2049                      struct thandle *th)
2050 {
2051         LASSERT(S_ISREG(attr->la_mode));
2052         return osd_mkfile(info, obj, (attr->la_mode &
2053                                (S_IFMT | S_IALLUGO | S_ISVTX)), hint, th);
2054 }
2055
2056 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
2057                      struct lu_attr *attr,
2058                      struct dt_allocation_hint *hint,
2059                      struct dt_object_format *dof,
2060                      struct thandle *th)
2061 {
2062         LASSERT(S_ISLNK(attr->la_mode));
2063         return osd_mkfile(info, obj, (attr->la_mode &
2064                               (S_IFMT | S_IALLUGO | S_ISVTX)), hint, th);
2065 }
2066
2067 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
2068                      struct lu_attr *attr,
2069                      struct dt_allocation_hint *hint,
2070                      struct dt_object_format *dof,
2071                      struct thandle *th)
2072 {
2073         umode_t mode = attr->la_mode & (S_IFMT | S_IALLUGO | S_ISVTX);
2074         int result;
2075
2076         LINVRNT(osd_invariant(obj));
2077         LASSERT(obj->oo_inode == NULL);
2078         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
2079                 S_ISFIFO(mode) || S_ISSOCK(mode));
2080
2081         result = osd_mkfile(info, obj, mode, hint, th);
2082         if (result == 0) {
2083                 LASSERT(obj->oo_inode != NULL);
2084                 /*
2085                  * This inode should be marked dirty for i_rdev.  Currently
2086                  * that is done in the osd_attr_init().
2087                  */
2088                 init_special_inode(obj->oo_inode, obj->oo_inode->i_mode,
2089                                    attr->la_rdev);
2090         }
2091         LINVRNT(osd_invariant(obj));
2092         return result;
2093 }
2094
2095 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
2096                               struct lu_attr *,
2097                               struct dt_allocation_hint *hint,
2098                               struct dt_object_format *dof,
2099                               struct thandle *);
2100
2101 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
2102 {
2103         osd_obj_type_f result;
2104
2105         switch (type) {
2106         case DFT_DIR:
2107                 result = osd_mkdir;
2108                 break;
2109         case DFT_REGULAR:
2110                 result = osd_mkreg;
2111                 break;
2112         case DFT_SYM:
2113                 result = osd_mksym;
2114                 break;
2115         case DFT_NODE:
2116                 result = osd_mknod;
2117                 break;
2118         case DFT_INDEX:
2119                 result = osd_mk_index;
2120                 break;
2121
2122         default:
2123                 LBUG();
2124                 break;
2125         }
2126         return result;
2127 }
2128
2129
2130 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
2131                         struct dt_object *parent, struct dt_object *child,
2132                         umode_t child_mode)
2133 {
2134         LASSERT(ah);
2135
2136         memset(ah, 0, sizeof(*ah));
2137         ah->dah_parent = parent;
2138         ah->dah_mode = child_mode;
2139 }
2140
2141 static void osd_attr_init(struct osd_thread_info *info, struct osd_object *obj,
2142                           struct lu_attr *attr, struct dt_object_format *dof)
2143 {
2144         struct inode   *inode = obj->oo_inode;
2145         __u64           valid = attr->la_valid;
2146         int             result;
2147
2148         attr->la_valid &= ~(LA_TYPE | LA_MODE);
2149
2150         if (dof->dof_type != DFT_NODE)
2151                 attr->la_valid &= ~LA_RDEV;
2152         if ((valid & LA_ATIME) && (attr->la_atime == LTIME_S(inode->i_atime)))
2153                 attr->la_valid &= ~LA_ATIME;
2154         if ((valid & LA_CTIME) && (attr->la_ctime == LTIME_S(inode->i_ctime)))
2155                 attr->la_valid &= ~LA_CTIME;
2156         if ((valid & LA_MTIME) && (attr->la_mtime == LTIME_S(inode->i_mtime)))
2157                 attr->la_valid &= ~LA_MTIME;
2158
2159         result = osd_quota_transfer(inode, attr);
2160         if (result)
2161                 return;
2162
2163         if (attr->la_valid != 0) {
2164                 result = osd_inode_setattr(info->oti_env, inode, attr);
2165                 /*
2166                  * The osd_inode_setattr() should always succeed here.  The
2167                  * only error that could be returned is EDQUOT when we are
2168                  * trying to change the UID or GID of the inode. However, this
2169                  * should not happen since quota enforcement is no longer
2170                  * enabled on ldiskfs (lquota takes care of it).
2171                  */
2172                 LASSERTF(result == 0, "%d", result);
2173                 ll_dirty_inode(inode, I_DIRTY_DATASYNC);
2174         }
2175
2176         attr->la_valid = valid;
2177 }
2178
2179 /**
2180  * Helper function for osd_object_create()
2181  *
2182  * \retval 0, on success
2183  */
2184 static int __osd_object_create(struct osd_thread_info *info,
2185                                struct osd_object *obj, struct lu_attr *attr,
2186                                struct dt_allocation_hint *hint,
2187                                struct dt_object_format *dof,
2188                                struct thandle *th)
2189 {
2190         int     result;
2191         __u32   umask;
2192
2193         /* we drop umask so that permissions we pass are not affected */
2194         umask = current->fs->umask;
2195         current->fs->umask = 0;
2196
2197         result = osd_create_type_f(dof->dof_type)(info, obj, attr, hint, dof,
2198                                                   th);
2199         if (result == 0) {
2200                 osd_attr_init(info, obj, attr, dof);
2201                 osd_object_init0(obj);
2202                 /* bz 24037 */
2203                 if (obj->oo_inode && (obj->oo_inode->i_state & I_NEW))
2204                         unlock_new_inode(obj->oo_inode);
2205         }
2206
2207         /* restore previous umask value */
2208         current->fs->umask = umask;
2209
2210         return result;
2211 }
2212
2213 /**
2214  * Helper function for osd_object_create()
2215  *
2216  * \retval 0, on success
2217  */
2218 static int __osd_oi_insert(const struct lu_env *env, struct osd_object *obj,
2219                            const struct lu_fid *fid, struct thandle *th)
2220 {
2221         struct osd_thread_info *info = osd_oti_get(env);
2222         struct osd_inode_id    *id   = &info->oti_id;
2223         struct osd_device      *osd  = osd_obj2dev(obj);
2224
2225         LASSERT(obj->oo_inode != NULL);
2226
2227         osd_id_gen(id, obj->oo_inode->i_ino, obj->oo_inode->i_generation);
2228         return osd_oi_insert(info, osd, fid, id, th, OI_CHECK_FLD);
2229 }
2230
2231 int osd_fld_lookup(const struct lu_env *env, struct osd_device *osd,
2232                    obd_seq seq, struct lu_seq_range *range)
2233 {
2234         struct seq_server_site  *ss = osd_seq_site(osd);
2235         int                     rc;
2236
2237         if (fid_seq_is_idif(seq)) {
2238                 fld_range_set_ost(range);
2239                 range->lsr_index = idif_ost_idx(seq);
2240                 return 0;
2241         }
2242
2243         if (!fid_seq_in_fldb(seq)) {
2244                 fld_range_set_mdt(range);
2245                 if (ss != NULL)
2246                         /* FIXME: If ss is NULL, it suppose not get lsr_index
2247                          * at all */
2248                         range->lsr_index = ss->ss_node_id;
2249                 return 0;
2250         }
2251
2252         LASSERT(ss != NULL);
2253         fld_range_set_any(range);
2254         rc = fld_server_lookup(env, ss->ss_server_fld, seq, range);
2255         if (rc != 0) {
2256                 CERROR("%s: cannot find FLD range for "LPX64": rc = %d\n",
2257                        osd_name(osd), seq, rc);
2258         }
2259         return rc;
2260 }
2261
2262 /*
2263  * Concurrency: no external locking is necessary.
2264  */
2265 static int osd_declare_object_create(const struct lu_env *env,
2266                                      struct dt_object *dt,
2267                                      struct lu_attr *attr,
2268                                      struct dt_allocation_hint *hint,
2269                                      struct dt_object_format *dof,
2270                                      struct thandle *handle)
2271 {
2272         struct lu_seq_range     *range = &osd_oti_get(env)->oti_seq_range;
2273         struct osd_thandle      *oh;
2274         int                      rc;
2275         ENTRY;
2276
2277         LASSERT(handle != NULL);
2278
2279         oh = container_of0(handle, struct osd_thandle, ot_super);
2280         LASSERT(oh->ot_handle == NULL);
2281
2282         osd_trans_declare_op(env, oh, OSD_OT_CREATE,
2283                              osd_dto_credits_noquota[DTO_OBJECT_CREATE]);
2284         /* Reuse idle OI block may cause additional one OI block
2285          * to be changed. */
2286         osd_trans_declare_op(env, oh, OSD_OT_INSERT,
2287                              osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1);
2288
2289         /* If this is directory, then we expect . and .. to be inserted as
2290          * well. The one directory block always needs to be created for the
2291          * directory, so we could use DTO_WRITE_BASE here (GDT, block bitmap,
2292          * block), there is no danger of needing a tree for the first block.
2293          */
2294         if (attr && S_ISDIR(attr->la_mode)) {
2295                 osd_trans_declare_op(env, oh, OSD_OT_INSERT,
2296                                      osd_dto_credits_noquota[DTO_WRITE_BASE]);
2297                 osd_trans_declare_op(env, oh, OSD_OT_INSERT, 0);
2298         }
2299
2300         if (!attr)
2301                 RETURN(0);
2302
2303         rc = osd_declare_inode_qid(env, attr->la_uid, attr->la_gid, 1, oh,
2304                                    osd_dt_obj(dt), false, NULL, false);
2305         if (rc != 0)
2306                 RETURN(rc);
2307
2308         /* It does fld look up inside declare, and the result will be
2309          * added to fld cache, so the following fld lookup inside insert
2310          * does not need send RPC anymore, so avoid send rpc with holding
2311          * transaction */
2312         if (fid_is_norm(lu_object_fid(&dt->do_lu)) &&
2313                 !fid_is_last_id(lu_object_fid(&dt->do_lu)))
2314                 osd_fld_lookup(env, osd_dt_dev(handle->th_dev),
2315                                fid_seq(lu_object_fid(&dt->do_lu)), range);
2316
2317
2318         RETURN(rc);
2319 }
2320
2321 static int osd_object_create(const struct lu_env *env, struct dt_object *dt,
2322                              struct lu_attr *attr,
2323                              struct dt_allocation_hint *hint,
2324                              struct dt_object_format *dof,
2325                              struct thandle *th)
2326 {
2327         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
2328         struct osd_object      *obj    = osd_dt_obj(dt);
2329         struct osd_thread_info *info   = osd_oti_get(env);
2330         int result;
2331
2332         ENTRY;
2333
2334         LINVRNT(osd_invariant(obj));
2335         LASSERT(!dt_object_exists(dt) && !dt_object_remote(dt));
2336         LASSERT(osd_write_locked(env, obj));
2337         LASSERT(th != NULL);
2338
2339         if (unlikely(fid_is_acct(fid)))
2340                 /* Quota files can't be created from the kernel any more,
2341                  * 'tune2fs -O quota' will take care of creating them */
2342                 RETURN(-EPERM);
2343
2344         osd_trans_exec_op(env, th, OSD_OT_CREATE);
2345         osd_trans_declare_rb(env, th, OSD_OT_REF_ADD);
2346
2347         result = __osd_object_create(info, obj, attr, hint, dof, th);
2348         if (result == 0)
2349                 result = __osd_oi_insert(env, obj, fid, th);
2350
2351         LASSERT(ergo(result == 0,
2352                      dt_object_exists(dt) && !dt_object_remote(dt)));
2353
2354         LASSERT(osd_invariant(obj));
2355         RETURN(result);
2356 }
2357
2358 /**
2359  * Called to destroy on-disk representation of the object
2360  *
2361  * Concurrency: must be locked
2362  */
2363 static int osd_declare_object_destroy(const struct lu_env *env,
2364                                       struct dt_object *dt,
2365                                       struct thandle *th)
2366 {
2367         struct osd_object  *obj = osd_dt_obj(dt);
2368         struct inode       *inode = obj->oo_inode;
2369         struct osd_thandle *oh;
2370         int                 rc;
2371         ENTRY;
2372
2373         oh = container_of0(th, struct osd_thandle, ot_super);
2374         LASSERT(oh->ot_handle == NULL);
2375         LASSERT(inode);
2376
2377         osd_trans_declare_op(env, oh, OSD_OT_DESTROY,
2378                              osd_dto_credits_noquota[DTO_OBJECT_DELETE]);
2379         /* Recycle idle OI leaf may cause additional three OI blocks
2380          * to be changed. */
2381         osd_trans_declare_op(env, oh, OSD_OT_DELETE,
2382                              osd_dto_credits_noquota[DTO_INDEX_DELETE] + 3);
2383         /* one less inode */
2384         rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, -1, oh,
2385                                    obj, false, NULL, false);
2386         if (rc)
2387                 RETURN(rc);
2388         /* data to be truncated */
2389         rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh,
2390                                    obj, true, NULL, false);
2391         RETURN(rc);
2392 }
2393
2394 static int osd_object_destroy(const struct lu_env *env,
2395                               struct dt_object *dt,
2396                               struct thandle *th)
2397 {
2398         const struct lu_fid    *fid = lu_object_fid(&dt->do_lu);
2399         struct osd_object      *obj = osd_dt_obj(dt);
2400         struct inode           *inode = obj->oo_inode;
2401         struct osd_device      *osd = osd_obj2dev(obj);
2402         struct osd_thandle     *oh;
2403         int                     result;
2404         ENTRY;
2405
2406         oh = container_of0(th, struct osd_thandle, ot_super);
2407         LASSERT(oh->ot_handle);
2408         LASSERT(inode);
2409         LASSERT(!lu_object_is_dying(dt->do_lu.lo_header));
2410
2411         if (unlikely(fid_is_acct(fid)))
2412                 RETURN(-EPERM);
2413
2414         if (S_ISDIR(inode->i_mode)) {
2415                 LASSERT(osd_inode_unlinked(inode) || inode->i_nlink == 1 ||
2416                         inode->i_nlink == 2);
2417                 /* it will check/delete the inode from remote parent,
2418                  * how to optimize it? unlink performance impaction XXX */
2419                 result = osd_delete_from_remote_parent(env, osd, obj, oh);
2420                 if (result != 0 && result != -ENOENT) {
2421                         CERROR("%s: delete inode "DFID": rc = %d\n",
2422                                osd_name(osd), PFID(fid), result);
2423                 }
2424                 spin_lock(&obj->oo_guard);
2425                 clear_nlink(inode);
2426                 spin_unlock(&obj->oo_guard);
2427                 ll_dirty_inode(inode, I_DIRTY_DATASYNC);
2428         }
2429
2430         osd_trans_exec_op(env, th, OSD_OT_DESTROY);
2431
2432         result = osd_oi_delete(osd_oti_get(env), osd, fid, th, OI_CHECK_FLD);
2433
2434         /* XXX: add to ext3 orphan list */
2435         /* rc = ext3_orphan_add(handle_t *handle, struct inode *inode) */
2436
2437         /* not needed in the cache anymore */
2438         set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags);
2439
2440         RETURN(0);
2441 }
2442
2443 /**
2444  * Put the fid into lustre_mdt_attrs, and then place the structure
2445  * inode's ea. This fid should not be altered during the life time
2446  * of the inode.
2447  *
2448  * \retval +ve, on success
2449  * \retval -ve, on error
2450  *
2451  * FIXME: It is good to have/use ldiskfs_xattr_set_handle() here
2452  */
2453 int osd_ea_fid_set(struct osd_thread_info *info, struct inode *inode,
2454                    const struct lu_fid *fid, __u32 compat, __u32 incompat)
2455 {
2456         struct lustre_mdt_attrs *lma = &info->oti_mdt_attrs;
2457         int                      rc;
2458         ENTRY;
2459
2460         if (OBD_FAIL_CHECK(OBD_FAIL_FID_INLMA))
2461                 RETURN(0);
2462
2463         lustre_lma_init(lma, fid, compat, incompat);
2464         lustre_lma_swab(lma);
2465
2466         rc = __osd_xattr_set(info, inode, XATTR_NAME_LMA, lma, sizeof(*lma),
2467                              XATTR_CREATE);
2468         /* LMA may already exist, but we need to check that all the
2469          * desired compat/incompat flags have been added. */
2470         if (unlikely(rc == -EEXIST)) {
2471                 if (compat == 0 && incompat == 0)
2472                         RETURN(0);
2473
2474                 rc = __osd_xattr_get(inode, &info->oti_obj_dentry,
2475                                      XATTR_NAME_LMA, info->oti_mdt_attrs_old,
2476                                      LMA_OLD_SIZE);
2477                 if (rc <= 0)
2478                         RETURN(-EINVAL);
2479
2480                 lustre_lma_swab(lma);
2481                 if (!(~lma->lma_compat & compat) &&
2482                     !(~lma->lma_incompat & incompat))
2483                         RETURN(0);
2484
2485                 lma->lma_compat |= compat;
2486                 lma->lma_incompat |= incompat;
2487                 lustre_lma_swab(lma);
2488                 rc = __osd_xattr_set(info, inode, XATTR_NAME_LMA, lma,
2489                                      sizeof(*lma), XATTR_REPLACE);
2490         }
2491
2492         RETURN(rc);
2493 }
2494
2495 /**
2496  * ldiskfs supports fid in dirent, it is passed in dentry->d_fsdata.
2497  * lustre 1.8 also uses d_fsdata for passing other info to ldiskfs.
2498  * To have compatilibility with 1.8 ldiskfs driver we need to have
2499  * magic number at start of fid data.
2500  * \ldiskfs_dentry_param is used only to pass fid from osd to ldiskfs.
2501  * its inmemory API.
2502  */
2503 void osd_get_ldiskfs_dirent_param(struct ldiskfs_dentry_param *param,
2504                                   const struct dt_rec *fid)
2505 {
2506         if (!fid_is_namespace_visible((const struct lu_fid *)fid) ||
2507             OBD_FAIL_CHECK(OBD_FAIL_FID_IGIF)) {
2508                 param->edp_magic = 0;
2509                 return;
2510         }
2511
2512         param->edp_magic = LDISKFS_LUFID_MAGIC;
2513         param->edp_len =  sizeof(struct lu_fid) + 1;
2514         fid_cpu_to_be((struct lu_fid *)param->edp_data, (struct lu_fid *)fid);
2515 }
2516
2517 /**
2518  * Try to read the fid from inode ea into dt_rec.
2519  *
2520  * \param fid object fid.
2521  *
2522  * \retval 0 on success
2523  */
2524 static int osd_ea_fid_get(const struct lu_env *env, struct osd_object *obj,
2525                           __u32 ino, struct lu_fid *fid,
2526                           struct osd_inode_id *id)
2527 {
2528         struct osd_thread_info *info  = osd_oti_get(env);
2529         struct inode           *inode;
2530         ENTRY;
2531
2532         osd_id_gen(id, ino, OSD_OII_NOGEN);
2533         inode = osd_iget_fid(info, osd_obj2dev(obj), id, fid);
2534         if (IS_ERR(inode))
2535                 RETURN(PTR_ERR(inode));
2536
2537         iput(inode);
2538         RETURN(0);
2539 }
2540
2541 static int osd_add_dot_dotdot_internal(struct osd_thread_info *info,
2542                                         struct inode *dir,
2543                                         struct inode  *parent_dir,
2544                                         const struct dt_rec *dot_fid,
2545                                         const struct dt_rec *dot_dot_fid,
2546                                         struct osd_thandle *oth)
2547 {
2548         struct ldiskfs_dentry_param *dot_ldp;
2549         struct ldiskfs_dentry_param *dot_dot_ldp;
2550
2551         dot_dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp2;
2552         osd_get_ldiskfs_dirent_param(dot_dot_ldp, dot_dot_fid);
2553
2554         dot_ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
2555         dot_ldp->edp_magic = 0;
2556         return ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir,
2557                                         dir, dot_ldp, dot_dot_ldp);
2558 }
2559
2560 /**
2561  * Create an local agent inode for remote entry
2562  */
2563 static struct inode *osd_create_local_agent_inode(const struct lu_env *env,
2564                                                   struct osd_device *osd,
2565                                                   struct osd_object *pobj,
2566                                                   const struct lu_fid *fid,
2567                                                   struct thandle *th)
2568 {
2569         struct osd_thread_info  *info = osd_oti_get(env);
2570         struct inode            *local;
2571         struct osd_thandle      *oh;
2572         int                     rc;
2573         ENTRY;
2574
2575         LASSERT(th);
2576         oh = container_of(th, struct osd_thandle, ot_super);
2577         LASSERT(oh->ot_handle->h_transaction != NULL);
2578
2579         /* FIXME: Insert index api needs to know the mode of
2580          * the remote object. Just use S_IFDIR for now */
2581         local = ldiskfs_create_inode(oh->ot_handle, pobj->oo_inode, S_IFDIR);
2582         if (IS_ERR(local)) {
2583                 CERROR("%s: create local error %d\n", osd_name(osd),
2584                        (int)PTR_ERR(local));
2585                 RETURN(local);
2586         }
2587
2588         ldiskfs_set_inode_state(local, LDISKFS_STATE_LUSTRE_NO_OI);
2589         /* Set special LMA flag for local agent inode */
2590         rc = osd_ea_fid_set(info, local, fid, 0, LMAI_AGENT);
2591         if (rc != 0) {
2592                 CERROR("%s: set LMA for "DFID" remote inode failed: rc = %d\n",
2593                        osd_name(osd), PFID(fid), rc);
2594                 RETURN(ERR_PTR(rc));
2595         }
2596
2597         rc = osd_add_dot_dotdot_internal(info, local, pobj->oo_inode,
2598                 (const struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
2599                 (const struct dt_rec *)fid, oh);
2600         if (rc != 0) {
2601                 CERROR("%s: "DFID" add dot dotdot error: rc = %d\n",
2602                         osd_name(osd), PFID(fid), rc);
2603                 RETURN(ERR_PTR(rc));
2604         }
2605
2606         RETURN(local);
2607 }
2608
2609 /**
2610  * Delete local agent inode for remote entry
2611  */
2612 static int osd_delete_local_agent_inode(const struct lu_env *env,
2613                                         struct osd_device *osd,
2614                                         const struct lu_fid *fid,
2615                                         __u32 ino, struct osd_thandle *oh)
2616 {
2617         struct osd_thread_info  *oti = osd_oti_get(env);
2618         struct osd_inode_id     *id = &oti->oti_id;
2619         struct inode            *inode;
2620         ENTRY;
2621
2622         id->oii_ino = le32_to_cpu(ino);
2623         id->oii_gen = OSD_OII_NOGEN;
2624         inode = osd_iget(oti, osd, id);
2625         if (IS_ERR(inode)) {
2626                 CERROR("%s: iget error "DFID" id %u:%u\n", osd_name(osd),
2627                        PFID(fid), id->oii_ino, id->oii_gen);
2628                 RETURN(PTR_ERR(inode));
2629         }
2630
2631         clear_nlink(inode);
2632         mark_inode_dirty(inode);
2633         CDEBUG(D_INODE, "%s: delete remote inode "DFID" %lu\n",
2634                 osd_name(osd), PFID(fid), inode->i_ino);
2635         iput(inode);
2636         RETURN(0);
2637 }
2638
2639 /**
2640  * OSD layer object create function for interoperability mode (b11826).
2641  * This is mostly similar to osd_object_create(). Only difference being, fid is
2642  * inserted into inode ea here.
2643  *
2644  * \retval   0, on success
2645  * \retval -ve, on error
2646  */
2647 static int osd_object_ea_create(const struct lu_env *env, struct dt_object *dt,
2648                                 struct lu_attr *attr,
2649                                 struct dt_allocation_hint *hint,
2650                                 struct dt_object_format *dof,
2651                                 struct thandle *th)
2652 {
2653         const struct lu_fid    *fid    = lu_object_fid(&dt->do_lu);
2654         struct osd_object      *obj    = osd_dt_obj(dt);
2655         struct osd_thread_info *info   = osd_oti_get(env);
2656         int                     result;
2657
2658         ENTRY;
2659
2660         LASSERT(osd_invariant(obj));
2661         LASSERT(!dt_object_exists(dt) && !dt_object_remote(dt));
2662         LASSERT(osd_write_locked(env, obj));
2663         LASSERT(th != NULL);
2664
2665         if (unlikely(fid_is_acct(fid)))
2666                 /* Quota files can't be created from the kernel any more,
2667                  * 'tune2fs -O quota' will take care of creating them */
2668                 RETURN(-EPERM);
2669
2670         osd_trans_exec_op(env, th, OSD_OT_CREATE);
2671         osd_trans_declare_rb(env, th, OSD_OT_REF_ADD);
2672
2673         result = __osd_object_create(info, obj, attr, hint, dof, th);
2674         if (result == 0)
2675                 result = osd_ea_fid_set(info, obj->oo_inode, fid,
2676                                 fid_is_on_ost(info, osd_obj2dev(obj),
2677                                               fid, OI_CHECK_FLD) ?
2678                                 LMAC_FID_ON_OST : 0, 0);
2679
2680         if (result == 0)
2681                 result = __osd_oi_insert(env, obj, fid, th);
2682
2683         LASSERT(ergo(result == 0,
2684                      dt_object_exists(dt) && !dt_object_remote(dt)));
2685         LINVRNT(osd_invariant(obj));
2686         RETURN(result);
2687 }
2688
2689 static int osd_declare_object_ref_add(const struct lu_env *env,
2690                                       struct dt_object *dt,
2691                                       struct thandle *handle)
2692 {
2693         struct osd_thandle       *oh;
2694
2695         /* it's possible that object doesn't exist yet */
2696         LASSERT(handle != NULL);
2697
2698         oh = container_of0(handle, struct osd_thandle, ot_super);
2699         LASSERT(oh->ot_handle == NULL);
2700
2701         osd_trans_declare_op(env, oh, OSD_OT_REF_ADD,
2702                              osd_dto_credits_noquota[DTO_ATTR_SET_BASE]);
2703
2704         return 0;
2705 }
2706
2707 /*
2708  * Concurrency: @dt is write locked.
2709  */
2710 static int osd_object_ref_add(const struct lu_env *env,
2711                               struct dt_object *dt, struct thandle *th)
2712 {
2713         struct osd_object  *obj = osd_dt_obj(dt);
2714         struct inode       *inode = obj->oo_inode;
2715         struct osd_thandle *oh;
2716         int                 rc = 0;
2717
2718         LINVRNT(osd_invariant(obj));
2719         LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2720         LASSERT(osd_write_locked(env, obj));
2721         LASSERT(th != NULL);
2722
2723         oh = container_of0(th, struct osd_thandle, ot_super);
2724         LASSERT(oh->ot_handle != NULL);
2725
2726         osd_trans_exec_op(env, th, OSD_OT_REF_ADD);
2727
2728         /*
2729          * The DIR_NLINK feature allows directories to exceed LDISKFS_LINK_MAX
2730          * (65000) subdirectories by storing "1" in i_nlink if the link count
2731          * would otherwise overflow. Directory tranversal tools understand
2732          * that (st_nlink == 1) indicates that the filesystem dose not track
2733          * hard links count on the directory, and will not abort subdirectory
2734          * scanning early once (st_nlink - 2) subdirs have been found.
2735          *
2736          * This also has to properly handle the case of inodes with nlink == 0
2737          * in case they are being linked into the PENDING directory
2738          */
2739         spin_lock(&obj->oo_guard);
2740         ldiskfs_inc_count(oh->ot_handle, inode);
2741         LASSERT(inode->i_nlink <= LDISKFS_LINK_MAX);
2742         spin_unlock(&obj->oo_guard);
2743
2744         ll_dirty_inode(inode, I_DIRTY_DATASYNC);
2745         LINVRNT(osd_invariant(obj));
2746
2747         return rc;
2748 }
2749
2750 static int osd_declare_object_ref_del(const struct lu_env *env,
2751                                       struct dt_object *dt,
2752                                       struct thandle *handle)
2753 {
2754         struct osd_thandle *oh;
2755
2756         LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2757         LASSERT(handle != NULL);
2758
2759         oh = container_of0(handle, struct osd_thandle, ot_super);
2760         LASSERT(oh->ot_handle == NULL);
2761
2762         osd_trans_declare_op(env, oh, OSD_OT_REF_DEL,
2763                              osd_dto_credits_noquota[DTO_ATTR_SET_BASE]);
2764
2765         return 0;
2766 }
2767
2768 /*
2769  * Concurrency: @dt is write locked.
2770  */
2771 static int osd_object_ref_del(const struct lu_env *env, struct dt_object *dt,
2772                               struct thandle *th)
2773 {
2774         struct osd_object       *obj = osd_dt_obj(dt);
2775         struct inode            *inode = obj->oo_inode;
2776         struct osd_device       *osd = osd_dev(dt->do_lu.lo_dev);
2777         struct osd_thandle      *oh;
2778
2779         LINVRNT(osd_invariant(obj));
2780         LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2781         LASSERT(osd_write_locked(env, obj));
2782         LASSERT(th != NULL);
2783
2784         oh = container_of0(th, struct osd_thandle, ot_super);
2785         LASSERT(oh->ot_handle != NULL);
2786
2787         osd_trans_exec_op(env, th, OSD_OT_REF_DEL);
2788
2789         spin_lock(&obj->oo_guard);
2790         /* That can be result of upgrade from old Lustre version and
2791          * applied only to local files.  Just skip this ref_del call.
2792          * ext4_unlink() only treats this as a warning, don't LASSERT here.*/
2793         if (inode->i_nlink == 0) {
2794                 CDEBUG_LIMIT(fid_is_norm(lu_object_fid(&dt->do_lu)) ?
2795                              D_ERROR : D_INODE, "%s: nlink == 0 on "DFID
2796                              ", maybe an upgraded file? (LU-3915)\n",
2797                              osd_name(osd), PFID(lu_object_fid(&dt->do_lu)));
2798                 spin_unlock(&obj->oo_guard);
2799                 return 0;
2800         }
2801
2802         ldiskfs_dec_count(oh->ot_handle, inode);
2803         spin_unlock(&obj->oo_guard);
2804
2805         ll_dirty_inode(inode, I_DIRTY_DATASYNC);
2806         LINVRNT(osd_invariant(obj));
2807
2808         return 0;
2809 }
2810
2811 /*
2812  * Get the 64-bit version for an inode.
2813  */
2814 static int osd_object_version_get(const struct lu_env *env,
2815                                   struct dt_object *dt, dt_obj_version_t *ver)
2816 {
2817         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2818
2819         CDEBUG(D_INODE, "Get version "LPX64" for inode %lu\n",
2820                LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2821         *ver = LDISKFS_I(inode)->i_fs_version;
2822         return 0;
2823 }
2824
2825 /*
2826  * Concurrency: @dt is read locked.
2827  */
2828 static int osd_xattr_get(const struct lu_env *env, struct dt_object *dt,
2829                          struct lu_buf *buf, const char *name,
2830                          struct lustre_capa *capa)
2831 {
2832         struct osd_object      *obj    = osd_dt_obj(dt);
2833         struct inode           *inode  = obj->oo_inode;
2834         struct osd_thread_info *info   = osd_oti_get(env);
2835         struct dentry          *dentry = &info->oti_obj_dentry;
2836
2837         /* version get is not real XATTR but uses xattr API */
2838         if (strcmp(name, XATTR_NAME_VERSION) == 0) {
2839                 /* for version we are just using xattr API but change inode
2840                  * field instead */
2841                 LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
2842                 osd_object_version_get(env, dt, buf->lb_buf);
2843                 return sizeof(dt_obj_version_t);
2844         }
2845
2846         LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2847         LASSERT(inode->i_op != NULL && inode->i_op->getxattr != NULL);
2848
2849         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2850                 return -EACCES;
2851
2852         return __osd_xattr_get(inode, dentry, name, buf->lb_buf, buf->lb_len);
2853 }
2854
2855
2856 static int osd_declare_xattr_set(const struct lu_env *env,
2857                                  struct dt_object *dt,
2858                                  const struct lu_buf *buf, const char *name,
2859                                  int fl, struct thandle *handle)
2860 {
2861         struct osd_thandle *oh;
2862         int credits;
2863         struct super_block *sb = osd_sb(osd_dev(dt->do_lu.lo_dev));
2864
2865         LASSERT(handle != NULL);
2866
2867         oh = container_of0(handle, struct osd_thandle, ot_super);
2868         LASSERT(oh->ot_handle == NULL);
2869
2870         /* optimistic optimization: LMA is set first and usually fit inode */
2871         if (strcmp(name, XATTR_NAME_LMA) == 0) {
2872                 if (dt_object_exists(dt))
2873                         credits = 0;
2874                 else
2875                         credits = 1;
2876         } else if (strcmp(name, XATTR_NAME_VERSION) == 0) {
2877                 credits = 1;
2878         } else {
2879                 credits = osd_dto_credits_noquota[DTO_XATTR_SET];
2880                 if (buf && buf->lb_len > sb->s_blocksize) {
2881                         credits *= (buf->lb_len + sb->s_blocksize - 1) >>
2882                                         sb->s_blocksize_bits;
2883                 }
2884                 /*
2885                  * xattr set may involve inode quota change, reserve credits for
2886                  * dquot_initialize()
2887                  */
2888                 oh->ot_credits += LDISKFS_MAXQUOTAS_INIT_BLOCKS(sb);
2889         }
2890
2891         osd_trans_declare_op(env, oh, OSD_OT_XATTR_SET, credits);
2892
2893         return 0;
2894 }
2895
2896 /*
2897  * Set the 64-bit version for object
2898  */
2899 static void osd_object_version_set(const struct lu_env *env,
2900                                    struct dt_object *dt,
2901                                    dt_obj_version_t *new_version)
2902 {
2903         struct inode *inode = osd_dt_obj(dt)->oo_inode;
2904
2905         CDEBUG(D_INODE, "Set version "LPX64" (old "LPX64") for inode %lu\n",
2906                *new_version, LDISKFS_I(inode)->i_fs_version, inode->i_ino);
2907
2908         LDISKFS_I(inode)->i_fs_version = *new_version;
2909         /** Version is set after all inode operations are finished,
2910          *  so we should mark it dirty here */
2911         ll_dirty_inode(inode, I_DIRTY_DATASYNC);
2912 }
2913
2914 /*
2915  * Concurrency: @dt is write locked.
2916  */
2917 static int osd_xattr_set(const struct lu_env *env, struct dt_object *dt,
2918                          const struct lu_buf *buf, const char *name, int fl,
2919                          struct thandle *handle, struct lustre_capa *capa)
2920 {
2921         struct osd_object      *obj      = osd_dt_obj(dt);
2922         struct inode           *inode    = obj->oo_inode;
2923         struct osd_thread_info *info     = osd_oti_get(env);
2924         int                     fs_flags = 0;
2925         ENTRY;
2926
2927         LASSERT(handle != NULL);
2928
2929         /* version set is not real XATTR */
2930         if (strcmp(name, XATTR_NAME_VERSION) == 0) {
2931                 /* for version we are just using xattr API but change inode
2932                  * field instead */
2933                 LASSERT(buf->lb_len == sizeof(dt_obj_version_t));
2934                 osd_object_version_set(env, dt, buf->lb_buf);
2935                 return sizeof(dt_obj_version_t);
2936         }
2937
2938         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
2939                 return -EACCES;
2940
2941         osd_trans_exec_op(env, handle, OSD_OT_XATTR_SET);
2942         if (fl & LU_XATTR_REPLACE)
2943                 fs_flags |= XATTR_REPLACE;
2944
2945         if (fl & LU_XATTR_CREATE)
2946                 fs_flags |= XATTR_CREATE;
2947
2948         return __osd_xattr_set(info, inode, name, buf->lb_buf, buf->lb_len,
2949                                fs_flags);
2950 }
2951
2952 /*
2953  * Concurrency: @dt is read locked.
2954  */
2955 static int osd_xattr_list(const struct lu_env *env, struct dt_object *dt,
2956                           struct lu_buf *buf, struct lustre_capa *capa)
2957 {
2958         struct osd_object      *obj    = osd_dt_obj(dt);
2959         struct inode           *inode  = obj->oo_inode;
2960         struct osd_thread_info *info   = osd_oti_get(env);
2961         struct dentry          *dentry = &info->oti_obj_dentry;
2962
2963         LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2964         LASSERT(inode->i_op != NULL && inode->i_op->listxattr != NULL);
2965         LASSERT(osd_read_locked(env, obj) || osd_write_locked(env, obj));
2966
2967         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_READ))
2968                 return -EACCES;
2969
2970         dentry->d_inode = inode;
2971         dentry->d_sb = inode->i_sb;
2972         return inode->i_op->listxattr(dentry, buf->lb_buf, buf->lb_len);
2973 }
2974
2975 static int osd_declare_xattr_del(const struct lu_env *env,
2976                                  struct dt_object *dt, const char *name,
2977                                  struct thandle *handle)
2978 {
2979         struct osd_thandle *oh;
2980         struct super_block *sb = osd_sb(osd_dev(dt->do_lu.lo_dev));
2981
2982         LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
2983         LASSERT(handle != NULL);
2984
2985         oh = container_of0(handle, struct osd_thandle, ot_super);
2986         LASSERT(oh->ot_handle == NULL);
2987
2988         osd_trans_declare_op(env, oh, OSD_OT_XATTR_SET,
2989                              osd_dto_credits_noquota[DTO_XATTR_SET]);
2990         /*
2991          * xattr del may involve inode quota change, reserve credits for
2992          * dquot_initialize()
2993          */
2994         oh->ot_credits += LDISKFS_MAXQUOTAS_INIT_BLOCKS(sb);
2995
2996         return 0;
2997 }
2998
2999 /*
3000  * Concurrency: @dt is write locked.
3001  */
3002 static int osd_xattr_del(const struct lu_env *env, struct dt_object *dt,
3003                          const char *name, struct thandle *handle,
3004                          struct lustre_capa *capa)
3005 {
3006         struct osd_object      *obj    = osd_dt_obj(dt);
3007         struct inode           *inode  = obj->oo_inode;
3008         struct osd_thread_info *info   = osd_oti_get(env);
3009         struct dentry          *dentry = &info->oti_obj_dentry;
3010         int                     rc;
3011
3012         LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3013         LASSERT(inode->i_op != NULL && inode->i_op->removexattr != NULL);
3014         LASSERT(handle != NULL);
3015
3016         if (osd_object_auth(env, dt, capa, CAPA_OPC_META_WRITE))
3017                 return -EACCES;
3018
3019         osd_trans_exec_op(env, handle, OSD_OT_XATTR_SET);
3020
3021         ll_vfs_dq_init(inode);
3022         dentry->d_inode = inode;
3023         dentry->d_sb = inode->i_sb;
3024         rc = inode->i_op->removexattr(dentry, name);
3025         return rc;
3026 }
3027
3028 static struct obd_capa *osd_capa_get(const struct lu_env *env,
3029                                      struct dt_object *dt,
3030                                      struct lustre_capa *old, __u64 opc)
3031 {
3032         struct osd_thread_info *info = osd_oti_get(env);
3033         const struct lu_fid *fid = lu_object_fid(&dt->do_lu);
3034         struct osd_object *obj = osd_dt_obj(dt);
3035         struct osd_device *osd = osd_obj2dev(obj);
3036         struct lustre_capa_key *key = &info->oti_capa_key;
3037         struct lustre_capa *capa = &info->oti_capa;
3038         struct obd_capa *oc;
3039         struct lu_capainfo *lci;
3040         int rc;
3041         ENTRY;
3042
3043         if (!osd->od_fl_capa)
3044                 RETURN(ERR_PTR(-ENOENT));
3045
3046         LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3047         LINVRNT(osd_invariant(obj));
3048
3049         /* renewal sanity check */
3050         if (old && osd_object_auth(env, dt, old, opc))
3051                 RETURN(ERR_PTR(-EACCES));
3052
3053         lci = lu_capainfo_get(env);
3054         if (unlikely(lci == NULL))
3055                 RETURN(ERR_PTR(-ENOENT));
3056
3057         switch (lci->lci_auth) {
3058         case LC_ID_NONE:
3059                 RETURN(NULL);
3060         case LC_ID_PLAIN:
3061                 capa->lc_uid = obj->oo_inode->i_uid;
3062                 capa->lc_gid = obj->oo_inode->i_gid;
3063                 capa->lc_flags = LC_ID_PLAIN;
3064                 break;
3065         case LC_ID_CONVERT: {
3066                 __u32 d[4], s[4];
3067
3068                 s[0] = obj->oo_inode->i_uid;
3069                 cfs_get_random_bytes(&(s[1]), sizeof(__u32));
3070                 s[2] = obj->oo_inode->i_gid;
3071                 cfs_get_random_bytes(&(s[3]), sizeof(__u32));
3072                 rc = capa_encrypt_id(d, s, key->lk_key, CAPA_HMAC_KEY_MAX_LEN);
3073                 if (unlikely(rc))
3074                         RETURN(ERR_PTR(rc));
3075
3076                 capa->lc_uid   = ((__u64)d[1] << 32) | d[0];
3077                 capa->lc_gid   = ((__u64)d[3] << 32) | d[2];
3078                 capa->lc_flags = LC_ID_CONVERT;
3079                 break;
3080         }
3081         default:
3082                 RETURN(ERR_PTR(-EINVAL));
3083         }
3084
3085         capa->lc_fid = *fid;
3086         capa->lc_opc = opc;
3087         capa->lc_flags |= osd->od_capa_alg << 24;
3088         capa->lc_timeout = osd->od_capa_timeout;
3089         capa->lc_expiry = 0;
3090
3091         oc = capa_lookup(osd->od_capa_hash, capa, 1);
3092         if (oc) {
3093                 LASSERT(!capa_is_expired(oc));
3094                 RETURN(oc);
3095         }
3096
3097         spin_lock(&capa_lock);
3098         *key = osd->od_capa_keys[1];
3099         spin_unlock(&capa_lock);
3100
3101         capa->lc_keyid = key->lk_keyid;
3102         capa->lc_expiry = cfs_time_current_sec() + osd->od_capa_timeout;
3103
3104         rc = capa_hmac(capa->lc_hmac, capa, key->lk_key);
3105         if (rc) {
3106                 DEBUG_CAPA(D_ERROR, capa, "HMAC failed: %d for", rc);
3107                 RETURN(ERR_PTR(rc));
3108         }
3109
3110         oc = capa_add(osd->od_capa_hash, capa);
3111         RETURN(oc);
3112 }
3113
3114 static int osd_object_sync(const struct lu_env *env, struct dt_object *dt)
3115 {
3116         struct osd_object       *obj    = osd_dt_obj(dt);
3117         struct inode            *inode  = obj->oo_inode;
3118         struct osd_thread_info  *info   = osd_oti_get(env);
3119         struct dentry           *dentry = &info->oti_obj_dentry;
3120         struct file             *file   = &info->oti_file;
3121         int                     rc;
3122
3123         ENTRY;
3124
3125         dentry->d_inode = inode;
3126         dentry->d_sb = inode->i_sb;
3127         file->f_dentry = dentry;
3128         file->f_mapping = inode->i_mapping;
3129         file->f_op = inode->i_fop;
3130 #ifndef HAVE_FILE_FSYNC_4ARGS
3131         mutex_lock(&inode->i_mutex);
3132 #endif
3133         rc = do_fsync(file, 0);
3134 #ifndef HAVE_FILE_FSYNC_4ARGS
3135         mutex_unlock(&inode->i_mutex);
3136 #endif
3137         RETURN(rc);
3138 }
3139
3140 static int osd_data_get(const struct lu_env *env, struct dt_object *dt,
3141                         void **data)
3142 {
3143         struct osd_object *obj = osd_dt_obj(dt);
3144         ENTRY;
3145
3146         *data = (void *)obj->oo_inode;
3147         RETURN(0);
3148 }
3149
3150 /*
3151  * Index operations.
3152  */
3153
3154 static int osd_iam_index_probe(const struct lu_env *env, struct osd_object *o,
3155                            const struct dt_index_features *feat)
3156 {
3157         struct iam_descr *descr;
3158
3159         if (osd_object_is_root(o))
3160                 return feat == &dt_directory_features;
3161
3162         LASSERT(o->oo_dir != NULL);
3163
3164         descr = o->oo_dir->od_container.ic_descr;
3165         if (feat == &dt_directory_features) {
3166                 if (descr->id_rec_size == sizeof(struct osd_fid_pack))
3167                         return 1;
3168                 else
3169                         return 0;
3170         } else {
3171                 return
3172                         feat->dif_keysize_min <= descr->id_key_size &&
3173                         descr->id_key_size <= feat->dif_keysize_max &&
3174                         feat->dif_recsize_min <= descr->id_rec_size &&
3175                         descr->id_rec_size <= feat->dif_recsize_max &&
3176                         !(feat->dif_flags & (DT_IND_VARKEY |
3177                                              DT_IND_VARREC | DT_IND_NONUNQ)) &&
3178                         ergo(feat->dif_flags & DT_IND_UPDATE,
3179                              1 /* XXX check that object (and file system) is
3180                                 * writable */);
3181         }
3182 }
3183
3184 static int osd_iam_container_init(const struct lu_env *env,
3185                                   struct osd_object *obj,
3186                                   struct osd_directory *dir)
3187 {
3188         struct iam_container *bag = &dir->od_container;
3189         int result;
3190
3191         result = iam_container_init(bag, &dir->od_descr, obj->oo_inode);
3192         if (result != 0)
3193                 return result;
3194
3195         result = iam_container_setup(bag);
3196         if (result == 0)
3197                 obj->oo_dt.do_index_ops = &osd_index_iam_ops;
3198         else
3199                 iam_container_fini(bag);
3200
3201         return result;
3202 }
3203
3204
3205 /*
3206  * Concurrency: no external locking is necessary.
3207  */
3208 static int osd_index_try(const struct lu_env *env, struct dt_object *dt,
3209                          const struct dt_index_features *feat)
3210 {
3211         int                      result;
3212         int                      skip_iam = 0;
3213         struct osd_object       *obj = osd_dt_obj(dt);
3214
3215         LINVRNT(osd_invariant(obj));
3216
3217         if (osd_object_is_root(obj)) {
3218                 dt->do_index_ops = &osd_index_ea_ops;
3219                 result = 0;
3220         } else if (feat == &dt_directory_features) {
3221                 dt->do_index_ops = &osd_index_ea_ops;
3222                 if (obj->oo_inode != NULL && S_ISDIR(obj->oo_inode->i_mode))
3223                         result = 0;
3224                 else
3225                         result = -ENOTDIR;
3226                 skip_iam = 1;
3227         } else if (unlikely(feat == &dt_otable_features)) {
3228                 dt->do_index_ops = &osd_otable_ops;
3229                 return 0;
3230         } else if (unlikely(feat == &dt_acct_features)) {
3231                 dt->do_index_ops = &osd_acct_index_ops;
3232                 result = 0;
3233                 skip_iam = 1;
3234         } else if (!osd_has_index(obj)) {
3235                 struct osd_directory *dir;
3236
3237                 OBD_ALLOC_PTR(dir);
3238                 if (dir != NULL) {
3239
3240                         spin_lock(&obj->oo_guard);
3241                         if (obj->oo_dir == NULL)
3242                                 obj->oo_dir = dir;
3243                         else
3244                                 /*
3245                                  * Concurrent thread allocated container data.
3246                                  */
3247                                 OBD_FREE_PTR(dir);
3248                         spin_unlock(&obj->oo_guard);
3249                         /*
3250                          * Now, that we have container data, serialize its
3251                          * initialization.
3252                          */
3253                         down_write(&obj->oo_ext_idx_sem);
3254                         /*
3255                          * recheck under lock.
3256                          */
3257                         if (!osd_has_index(obj))
3258                                 result = osd_iam_container_init(env, obj,
3259                                                                 obj->oo_dir);
3260                         else
3261                                 result = 0;
3262                         up_write(&obj->oo_ext_idx_sem);
3263                 } else {
3264                         result = -ENOMEM;
3265                 }
3266         } else {
3267                 result = 0;
3268         }
3269
3270         if (result == 0 && skip_iam == 0) {
3271                 if (!osd_iam_index_probe(env, obj, feat))
3272                         result = -ENOTDIR;
3273         }
3274         LINVRNT(osd_invariant(obj));
3275
3276         if (result == 0 && is_quota_glb_feat(feat) &&
3277             fid_seq(lu_object_fid(&dt->do_lu)) == FID_SEQ_QUOTA_GLB)
3278                 result = osd_quota_migration(env, dt, feat);
3279
3280         return result;
3281 }
3282
3283 static int osd_otable_it_attr_get(const struct lu_env *env,
3284                                  struct dt_object *dt,
3285                                  struct lu_attr *attr,
3286                                  struct lustre_capa *capa)
3287 {
3288         attr->la_valid = 0;
3289         return 0;
3290 }
3291
3292 static const struct dt_object_operations osd_obj_ops = {
3293         .do_read_lock         = osd_object_read_lock,
3294         .do_write_lock        = osd_object_write_lock,
3295         .do_read_unlock       = osd_object_read_unlock,
3296         .do_write_unlock      = osd_object_write_unlock,
3297         .do_write_locked      = osd_object_write_locked,
3298         .do_attr_get          = osd_attr_get,
3299         .do_declare_attr_set  = osd_declare_attr_set,
3300         .do_attr_set          = osd_attr_set,
3301         .do_ah_init           = osd_ah_init,
3302         .do_declare_create    = osd_declare_object_create,
3303         .do_create            = osd_object_create,
3304         .do_declare_destroy   = osd_declare_object_destroy,
3305         .do_destroy           = osd_object_destroy,
3306         .do_index_try         = osd_index_try,
3307         .do_declare_ref_add   = osd_declare_object_ref_add,
3308         .do_ref_add           = osd_object_ref_add,
3309         .do_declare_ref_del   = osd_declare_object_ref_del,
3310         .do_ref_del           = osd_object_ref_del,
3311         .do_xattr_get         = osd_xattr_get,
3312         .do_declare_xattr_set = osd_declare_xattr_set,
3313         .do_xattr_set         = osd_xattr_set,
3314         .do_declare_xattr_del = osd_declare_xattr_del,
3315         .do_xattr_del         = osd_xattr_del,
3316         .do_xattr_list        = osd_xattr_list,
3317         .do_capa_get          = osd_capa_get,
3318         .do_object_sync       = osd_object_sync,
3319         .do_data_get          = osd_data_get,
3320 };
3321
3322 /**
3323  * dt_object_operations for interoperability mode
3324  * (i.e. to run 2.0 mds on 1.8 disk) (b11826)
3325  */
3326 static const struct dt_object_operations osd_obj_ea_ops = {
3327         .do_read_lock         = osd_object_read_lock,
3328         .do_write_lock        = osd_object_write_lock,
3329         .do_read_unlock       = osd_object_read_unlock,
3330         .do_write_unlock      = osd_object_write_unlock,
3331         .do_write_locked      = osd_object_write_locked,
3332         .do_attr_get          = osd_attr_get,
3333         .do_declare_attr_set  = osd_declare_attr_set,
3334         .do_attr_set          = osd_attr_set,
3335         .do_ah_init           = osd_ah_init,
3336         .do_declare_create    = osd_declare_object_create,
3337         .do_create            = osd_object_ea_create,
3338         .do_declare_destroy   = osd_declare_object_destroy,
3339         .do_destroy           = osd_object_destroy,
3340         .do_index_try         = osd_index_try,
3341         .do_declare_ref_add   = osd_declare_object_ref_add,
3342         .do_ref_add           = osd_object_ref_add,
3343         .do_declare_ref_del   = osd_declare_object_ref_del,
3344         .do_ref_del           = osd_object_ref_del,
3345         .do_xattr_get         = osd_xattr_get,
3346         .do_declare_xattr_set = osd_declare_xattr_set,
3347         .do_xattr_set         = osd_xattr_set,
3348         .do_declare_xattr_del = osd_declare_xattr_del,
3349         .do_xattr_del         = osd_xattr_del,
3350         .do_xattr_list        = osd_xattr_list,
3351         .do_capa_get          = osd_capa_get,
3352         .do_object_sync       = osd_object_sync,
3353         .do_data_get          = osd_data_get,
3354 };
3355
3356 static const struct dt_object_operations osd_obj_otable_it_ops = {
3357         .do_attr_get    = osd_otable_it_attr_get,
3358         .do_index_try   = osd_index_try,
3359 };
3360
3361 static int osd_index_declare_iam_delete(const struct lu_env *env,
3362                                         struct dt_object *dt,
3363                                         const struct dt_key *key,
3364                                         struct thandle *handle)
3365 {
3366         struct osd_thandle    *oh;
3367
3368         oh = container_of0(handle, struct osd_thandle, ot_super);
3369         LASSERT(oh->ot_handle == NULL);
3370
3371         osd_trans_declare_op(env, oh, OSD_OT_DELETE,
3372                              osd_dto_credits_noquota[DTO_INDEX_DELETE]);
3373
3374         return 0;
3375 }
3376
3377 /**
3378  *      delete a (key, value) pair from index \a dt specified by \a key
3379  *
3380  *      \param  dt      osd index object
3381  *      \param  key     key for index
3382  *      \param  rec     record reference
3383  *      \param  handle  transaction handler
3384  *
3385  *      \retval  0  success
3386  *      \retval -ve   failure
3387  */
3388
3389 static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
3390                                 const struct dt_key *key,
3391                                 struct thandle *handle,
3392                                 struct lustre_capa *capa)
3393 {
3394         struct osd_thread_info *oti = osd_oti_get(env);
3395         struct osd_object      *obj = osd_dt_obj(dt);
3396         struct osd_thandle     *oh;
3397         struct iam_path_descr  *ipd;
3398         struct iam_container   *bag = &obj->oo_dir->od_container;
3399         int                     rc;
3400
3401         ENTRY;
3402
3403         LINVRNT(osd_invariant(obj));
3404         LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3405         LASSERT(bag->ic_object == obj->oo_inode);
3406         LASSERT(handle != NULL);
3407
3408         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
3409                 RETURN(-EACCES);
3410
3411         osd_trans_exec_op(env, handle, OSD_OT_DELETE);
3412
3413         ipd = osd_idx_ipd_get(env, bag);
3414         if (unlikely(ipd == NULL))
3415                 RETURN(-ENOMEM);
3416
3417         oh = container_of0(handle, struct osd_thandle, ot_super);
3418         LASSERT(oh->ot_handle != NULL);
3419         LASSERT(oh->ot_handle->h_transaction != NULL);
3420
3421         if (fid_is_quota(lu_object_fid(&dt->do_lu))) {
3422                 /* swab quota uid/gid provided by caller */
3423                 oti->oti_quota_id = cpu_to_le64(*((__u64 *)key));
3424                 key = (const struct dt_key *)&oti->oti_quota_id;
3425         }
3426
3427         rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
3428         osd_ipd_put(env, bag, ipd);
3429         LINVRNT(osd_invariant(obj));
3430         RETURN(rc);
3431 }
3432
3433 static int osd_index_declare_ea_delete(const struct lu_env *env,
3434                                        struct dt_object *dt,
3435                                        const struct dt_key *key,
3436                                        struct thandle *handle)
3437 {
3438         struct osd_thandle *oh;
3439         struct inode       *inode;
3440         int                 rc;
3441         ENTRY;
3442
3443         LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3444         LASSERT(handle != NULL);
3445
3446         oh = container_of0(handle, struct osd_thandle, ot_super);
3447         LASSERT(oh->ot_handle == NULL);
3448
3449         osd_trans_declare_op(env, oh, OSD_OT_DELETE,
3450                              osd_dto_credits_noquota[DTO_INDEX_DELETE]);
3451
3452         inode = osd_dt_obj(dt)->oo_inode;
3453         LASSERT(inode);
3454
3455         rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0, oh,
3456                                    osd_dt_obj(dt), true, NULL, false);
3457         RETURN(rc);
3458 }
3459
3460 static inline int osd_get_fid_from_dentry(struct ldiskfs_dir_entry_2 *de,
3461                                           struct dt_rec *fid)
3462 {
3463         struct osd_fid_pack *rec;
3464         int                  rc = -ENODATA;
3465
3466         if (de->file_type & LDISKFS_DIRENT_LUFID) {
3467                 rec = (struct osd_fid_pack *) (de->name + de->name_len + 1);
3468                 rc = osd_fid_unpack((struct lu_fid *)fid, rec);
3469         }
3470         return rc;
3471 }
3472
3473 static int osd_mdt_seq_exists(const struct lu_env *env,
3474                               struct osd_device *osd, obd_seq seq)
3475 {
3476         struct lu_seq_range     *range = &osd_oti_get(env)->oti_seq_range;
3477         struct seq_server_site  *ss = osd_seq_site(osd);
3478         int                     rc;
3479         ENTRY;
3480
3481         if (ss == NULL)
3482                 RETURN(1);
3483
3484         /* XXX: currently, each MDT only store avaible sequence on disk, and no
3485          * allocated sequences information on disk, so we have to lookup FLDB,
3486          * but it probably makes more sense also store allocated sequence
3487          * locally, so we do not need do remote FLDB lookup in OSD */
3488         rc = osd_fld_lookup(env, osd, seq, range);
3489         if (rc != 0) {
3490                 CERROR("%s: Can not lookup fld for "LPX64"\n",
3491                        osd_name(osd), seq);
3492                 RETURN(0);
3493         }
3494
3495         RETURN(ss->ss_node_id == range->lsr_index);
3496 }
3497
3498 static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
3499                           struct lu_fid *fid)
3500 {
3501         ENTRY;
3502
3503         /* FID seqs not in FLDB, must be local seq */
3504         if (unlikely(!fid_seq_in_fldb(fid_seq(fid))))
3505                 RETURN(0);
3506
3507         /* Currently only check this for FID on MDT */
3508         if (osd_mdt_seq_exists(env, osd, fid_seq(fid)))
3509                 RETURN(0);
3510
3511         RETURN(1);
3512 }
3513
3514 /**
3515  * Index delete function for interoperability mode (b11826).
3516  * It will remove the directory entry added by osd_index_ea_insert().
3517  * This entry is needed to maintain name->fid mapping.
3518  *
3519  * \param key,  key i.e. file entry to be deleted
3520  *
3521  * \retval   0, on success
3522  * \retval -ve, on error
3523  */
3524 static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
3525                                const struct dt_key *key,
3526                                struct thandle *handle,
3527                                struct lustre_capa *capa)
3528 {
3529         struct osd_object          *obj    = osd_dt_obj(dt);
3530         struct inode               *dir    = obj->oo_inode;
3531         struct dentry              *dentry;
3532         struct osd_thandle         *oh;
3533         struct ldiskfs_dir_entry_2 *de = NULL;
3534         struct buffer_head         *bh;
3535         struct htree_lock          *hlock = NULL;
3536         struct lu_fid              *fid = &osd_oti_get(env)->oti_fid;
3537         struct osd_device          *osd = osd_dev(dt->do_lu.lo_dev);
3538         int                        rc;
3539         ENTRY;
3540
3541         LINVRNT(osd_invariant(obj));
3542         LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3543         LASSERT(handle != NULL);
3544
3545         osd_trans_exec_op(env, handle, OSD_OT_DELETE);
3546
3547         oh = container_of(handle, struct osd_thandle, ot_super);
3548         LASSERT(oh->ot_handle != NULL);
3549         LASSERT(oh->ot_handle->h_transaction != NULL);
3550
3551         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
3552                 RETURN(-EACCES);
3553
3554         ll_vfs_dq_init(dir);
3555         dentry = osd_child_dentry_get(env, obj,
3556                                       (char *)key, strlen((char *)key));
3557
3558         if (obj->oo_hl_head != NULL) {
3559                 hlock = osd_oti_get(env)->oti_hlock;
3560                 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
3561                                    dir, LDISKFS_HLOCK_DEL);
3562         } else {
3563                 down_write(&obj->oo_ext_idx_sem);
3564         }
3565
3566         bh = osd_ldiskfs_find_entry(dir, &dentry->d_name, &de, NULL, hlock);
3567         if (bh) {
3568                 __u32 ino = 0;
3569
3570                 /* If this is not the ".." entry, it might be a remote DNE
3571                  * entry and  we need to check if the FID is for a remote
3572                  * MDT.  If the FID is  not in the directory entry (e.g.
3573                  * upgraded 1.8 filesystem without dirdata enabled) then
3574                  * we need to get the FID from the LMA. For a remote directory
3575                  * there HAS to be an LMA, it cannot be an IGIF inode in this
3576                  * case.
3577                  *
3578                  * Delete the entry before the agent inode in order to
3579                  * simplify error handling.  At worst an error after deleting
3580                  * the entry first might leak the agent inode afterward. The
3581                  * reverse would need filesystem abort in case of error deleting
3582                  * the entry after the agent had been removed, or leave a
3583                  * dangling entry pointing at a random inode. */
3584                 if (strcmp((char *)key, dotdot) != 0) {
3585                         LASSERT(de != NULL);
3586                         rc = osd_get_fid_from_dentry(de, (struct dt_rec *)fid);
3587                         /* If Fid is not in dentry, try to get it from LMA */
3588                         if (rc == -ENODATA) {
3589                                 struct osd_inode_id *id;
3590                                 struct inode *inode;
3591
3592                                 /* Before trying to get fid from the inode,
3593                                  * check whether the inode is valid.
3594                                  *
3595                                  * If the inode has been deleted, do not go
3596                                  * ahead to do osd_ea_fid_get, which will set
3597                                  * the inode to bad inode, which might cause
3598                                  * the inode to be deleted uncorrectly */
3599                                 inode = ldiskfs_iget(osd_sb(osd),
3600                                                      le32_to_cpu(de->inode));
3601                                 if (IS_ERR(inode)) {
3602                                         CDEBUG(D_INODE, "%s: "DFID"get inode"
3603                                                "error.\n", osd_name(osd),
3604                                                PFID(fid));
3605                                         rc = PTR_ERR(inode);
3606                                 } else {
3607                                         if (likely(inode->i_nlink != 0)) {
3608                                                 id = &osd_oti_get(env)->oti_id;
3609                                                 rc = osd_ea_fid_get(env, obj,
3610                                                         le32_to_cpu(de->inode),
3611                                                                     fid, id);
3612                                         } else {
3613                                                 CDEBUG(D_INFO, "%s: %u "DFID
3614                                                        "deleted.\n",
3615                                                        osd_name(osd),
3616                                                        le32_to_cpu(de->inode),
3617                                                        PFID(fid));
3618                                                 rc = -ESTALE;
3619                                         }
3620                                         iput(inode);
3621                                 }
3622                         }
3623                         if (rc == 0 &&
3624                             unlikely(osd_remote_fid(env, osd, fid)))
3625                                 /* Need to delete agent inode */
3626                                 ino = le32_to_cpu(de->inode);
3627                 }
3628                 rc = ldiskfs_delete_entry(oh->ot_handle, dir, de, bh);
3629                 brelse(bh);
3630                 if (rc == 0 && unlikely(ino != 0)) {
3631                         rc = osd_delete_local_agent_inode(env, osd, fid, ino,
3632                                                           oh);
3633                         if (rc != 0)
3634                                 CERROR("%s: del local inode "DFID": rc = %d\n",
3635                                        osd_name(osd), PFID(fid), rc);
3636                 }
3637         } else {
3638                 rc = -ENOENT;
3639         }
3640         if (hlock != NULL)
3641                 ldiskfs_htree_unlock(hlock);
3642         else
3643                 up_write(&obj->oo_ext_idx_sem);
3644
3645         if (rc != 0)
3646                 GOTO(out, rc);
3647
3648         /* For inode on the remote MDT, .. will point to
3649          * /Agent directory, Check whether it needs to delete
3650          * from agent directory */
3651         if (unlikely(strcmp((char *)key, dotdot) == 0)) {
3652                 rc = osd_delete_from_remote_parent(env, osd_obj2dev(obj), obj,
3653                                                    oh);
3654                 if (rc != 0 && rc != -ENOENT) {
3655                         CERROR("%s: delete agent inode "DFID": rc = %d\n",
3656                                osd_name(osd), PFID(fid), rc);
3657                 }
3658
3659                 if (rc == -ENOENT)
3660                         rc = 0;
3661
3662                 GOTO(out, rc);
3663         }
3664 out:
3665
3666         LASSERT(osd_invariant(obj));
3667         RETURN(rc);
3668 }
3669
3670 /**
3671  *      Lookup index for \a key and copy record to \a rec.
3672  *
3673  *      \param  dt      osd index object
3674  *      \param  key     key for index
3675  *      \param  rec     record reference
3676  *
3677  *      \retval  +ve  success : exact mach
3678  *      \retval  0    return record with key not greater than \a key
3679  *      \retval -ve   failure
3680  */
3681 static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
3682                                 struct dt_rec *rec, const struct dt_key *key,
3683                                 struct lustre_capa *capa)
3684 {
3685         struct osd_object      *obj = osd_dt_obj(dt);
3686         struct iam_path_descr  *ipd;
3687         struct iam_container   *bag = &obj->oo_dir->od_container;
3688         struct osd_thread_info *oti = osd_oti_get(env);
3689         struct iam_iterator    *it = &oti->oti_idx_it;
3690         struct iam_rec         *iam_rec;
3691         int                     rc;
3692
3693         ENTRY;
3694
3695         LASSERT(osd_invariant(obj));
3696         LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3697         LASSERT(bag->ic_object == obj->oo_inode);
3698
3699         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
3700                 RETURN(-EACCES);
3701
3702         ipd = osd_idx_ipd_get(env, bag);
3703         if (IS_ERR(ipd))
3704                 RETURN(-ENOMEM);
3705
3706         /* got ipd now we can start iterator. */
3707         iam_it_init(it, bag, 0, ipd);
3708
3709         if (fid_is_quota(lu_object_fid(&dt->do_lu))) {
3710                 /* swab quota uid/gid provided by caller */
3711                 oti->oti_quota_id = cpu_to_le64(*((__u64 *)key));
3712                 key = (const struct dt_key *)&oti->oti_quota_id;
3713         }
3714
3715         rc = iam_it_get(it, (struct iam_key *)key);
3716         if (rc >= 0) {
3717                 if (S_ISDIR(obj->oo_inode->i_mode))
3718                         iam_rec = (struct iam_rec *)oti->oti_ldp;
3719                 else
3720                         iam_rec = (struct iam_rec *) rec;
3721
3722                 iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)iam_rec);
3723
3724                 if (S_ISDIR(obj->oo_inode->i_mode))
3725                         osd_fid_unpack((struct lu_fid *) rec,
3726                                        (struct osd_fid_pack *)iam_rec);
3727                 else if (fid_is_quota(lu_object_fid(&dt->do_lu)))
3728                         osd_quota_unpack(obj, rec);
3729         }
3730
3731         iam_it_put(it);
3732         iam_it_fini(it);
3733         osd_ipd_put(env, bag, ipd);
3734
3735         LINVRNT(osd_invariant(obj));
3736
3737         RETURN(rc);
3738 }
3739
3740 static int osd_index_declare_iam_insert(const struct lu_env *env,
3741                                         struct dt_object *dt,
3742                                         const struct dt_rec *rec,
3743                                         const struct dt_key *key,
3744                                         struct thandle *handle)
3745 {
3746         struct osd_thandle *oh;
3747
3748         LASSERT(handle != NULL);
3749
3750         oh = container_of0(handle, struct osd_thandle, ot_super);
3751         LASSERT(oh->ot_handle == NULL);
3752
3753         osd_trans_declare_op(env, oh, OSD_OT_INSERT,
3754                              osd_dto_credits_noquota[DTO_INDEX_INSERT]);
3755
3756         return 0;
3757 }
3758
3759 /**
3760  *      Inserts (key, value) pair in \a dt index object.
3761  *
3762  *      \param  dt      osd index object
3763  *      \param  key     key for index
3764  *      \param  rec     record reference
3765  *      \param  th      transaction handler
3766  *
3767  *      \retval  0  success
3768  *      \retval -ve failure
3769  */
3770 static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
3771                                 const struct dt_rec *rec,
3772                                 const struct dt_key *key, struct thandle *th,
3773                                 struct lustre_capa *capa, int ignore_quota)
3774 {
3775         struct osd_object     *obj = osd_dt_obj(dt);
3776         struct iam_path_descr *ipd;
3777         struct osd_thandle    *oh;
3778         struct iam_container  *bag = &obj->oo_dir->od_container;
3779         struct osd_thread_info *oti = osd_oti_get(env);
3780         struct iam_rec         *iam_rec;
3781         int                     rc;
3782
3783         ENTRY;
3784
3785         LINVRNT(osd_invariant(obj));
3786         LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
3787         LASSERT(bag->ic_object == obj->oo_inode);
3788         LASSERT(th != NULL);
3789
3790         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
3791                 RETURN(-EACCES);
3792
3793         osd_trans_exec_op(env, th, OSD_OT_INSERT);
3794
3795         ipd = osd_idx_ipd_get(env, bag);
3796         if (unlikely(ipd == NULL))
3797                 RETURN(-ENOMEM);
3798
3799         oh = container_of0(th, struct osd_thandle, ot_super);
3800         LASSERT(oh->ot_handle != NULL);
3801         LASSERT(oh->ot_handle->h_transaction != NULL);
3802         if (S_ISDIR(obj->oo_inode->i_mode)) {
3803                 iam_rec = (struct iam_rec *)oti->oti_ldp;
3804                 osd_fid_pack((struct osd_fid_pack *)iam_rec, rec, &oti->oti_fid);
3805         } else if (fid_is_quota(lu_object_fid(&dt->do_lu))) {
3806                 /* pack quota uid/gid */
3807                 oti->oti_quota_id = cpu_to_le64(*((__u64 *)key));
3808                 key = (const struct dt_key *)&oti->oti_quota_id;
3809                 /* pack quota record */
3810                 rec = osd_quota_pack(obj, rec, &oti->oti_quota_rec);
3811                 iam_rec = (struct iam_rec *)rec;
3812         } else {
3813                 iam_rec = (struct iam_rec *)rec;
3814         }
3815
3816         rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
3817                         iam_rec, ipd);
3818         osd_ipd_put(env, bag, ipd);
3819         LINVRNT(osd_invariant(obj));
3820         RETURN(rc);
3821 }
3822
3823 /**
3824  * Calls ldiskfs_add_entry() to add directory entry
3825  * into the directory. This is required for
3826  * interoperability mode (b11826)
3827  *
3828  * \retval   0, on success
3829  * \retval -ve, on error
3830  */
3831 static int __osd_ea_add_rec(struct osd_thread_info *info,
3832                             struct osd_object *pobj, struct inode  *cinode,
3833                             const char *name, const struct dt_rec *fid,
3834                             struct htree_lock *hlock, struct thandle *th)
3835 {
3836         struct ldiskfs_dentry_param *ldp;
3837         struct dentry               *child;
3838         struct osd_thandle          *oth;
3839         int                          rc;
3840
3841         oth = container_of(th, struct osd_thandle, ot_super);
3842         LASSERT(oth->ot_handle != NULL);
3843         LASSERT(oth->ot_handle->h_transaction != NULL);
3844         LASSERT(pobj->oo_inode);
3845
3846         ldp = (struct ldiskfs_dentry_param *)info->oti_ldp;
3847         if (unlikely(pobj->oo_inode ==
3848                      osd_sb(osd_obj2dev(pobj))->s_root->d_inode))
3849                 ldp->edp_magic = 0;
3850         else
3851                 osd_get_ldiskfs_dirent_param(ldp, fid);
3852         child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
3853         child->d_fsdata = (void *)ldp;
3854         ll_vfs_dq_init(pobj->oo_inode);
3855         rc = osd_ldiskfs_add_entry(oth->ot_handle, child, cinode, hlock);
3856
3857         RETURN(rc);
3858 }
3859
3860 /**
3861  * Calls ldiskfs_add_dot_dotdot() to add dot and dotdot entries
3862  * into the directory.Also sets flags into osd object to
3863  * indicate dot and dotdot are created. This is required for
3864  * interoperability mode (b11826)
3865  *
3866  * \param dir   directory for dot and dotdot fixup.
3867  * \param obj   child object for linking
3868  *
3869  * \retval   0, on success
3870  * \retval -ve, on error
3871  */
3872 static int osd_add_dot_dotdot(struct osd_thread_info *info,
3873                               struct osd_object *dir,
3874                               struct inode  *parent_dir, const char *name,
3875                               const struct dt_rec *dot_fid,
3876                               const struct dt_rec *dot_dot_fid,
3877                               struct thandle *th)
3878 {
3879         struct inode                *inode = dir->oo_inode;
3880         struct osd_thandle          *oth;
3881         int result = 0;
3882
3883         oth = container_of(th, struct osd_thandle, ot_super);
3884         LASSERT(oth->ot_handle->h_transaction != NULL);
3885         LASSERT(S_ISDIR(dir->oo_inode->i_mode));
3886
3887         if (strcmp(name, dot) == 0) {
3888                 if (dir->oo_compat_dot_created) {
3889                         result = -EEXIST;
3890                 } else {
3891                         LASSERT(inode == parent_dir);
3892                         dir->oo_compat_dot_created = 1;
3893                         result = 0;
3894                 }
3895         } else if (strcmp(name, dotdot) == 0) {
3896                 if (!dir->oo_compat_dot_created)
3897                         return -EINVAL;
3898                 /* in case of rename, dotdot is already created */
3899                 if (dir->oo_compat_dotdot_created) {
3900                         return __osd_ea_add_rec(info, dir, parent_dir, name,
3901                                                 dot_dot_fid, NULL, th);
3902                 }
3903
3904                 result = osd_add_dot_dotdot_internal(info, dir->oo_inode,
3905                                                 parent_dir, dot_fid,
3906                                                 dot_dot_fid, oth);
3907                 if (result == 0)
3908                         dir->oo_compat_dotdot_created = 1;
3909         }
3910
3911         return result;
3912 }
3913
3914
3915 /**
3916  * It will call the appropriate osd_add* function and return the
3917  * value, return by respective functions.
3918  */
3919 static int osd_ea_add_rec(const struct lu_env *env, struct osd_object *pobj,
3920                           struct inode *cinode, const char *name,
3921                           const struct dt_rec *fid, struct thandle *th)
3922 {
3923         struct osd_thread_info *info   = osd_oti_get(env);
3924         struct htree_lock      *hlock;
3925         int                     rc;
3926
3927         hlock = pobj->oo_hl_head != NULL ? info->oti_hlock : NULL;
3928
3929         if (name[0] == '.' && (name[1] == '\0' || (name[1] == '.' &&
3930                                                    name[2] =='\0'))) {
3931                 if (hlock != NULL) {
3932                         ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
3933                                            pobj->oo_inode, 0);
3934                 } else {
3935                         down_write(&pobj->oo_ext_idx_sem);
3936                 }
3937                 rc = osd_add_dot_dotdot(info, pobj, cinode, name,
3938                      (struct dt_rec *)lu_object_fid(&pobj->oo_dt.do_lu),
3939                                         fid, th);
3940         } else {
3941                 if (hlock != NULL) {
3942                         ldiskfs_htree_lock(hlock, pobj->oo_hl_head,
3943                                            pobj->oo_inode, LDISKFS_HLOCK_ADD);
3944                 } else {
3945                         down_write(&pobj->oo_ext_idx_sem);
3946                 }
3947
3948                 if (OBD_FAIL_CHECK(OBD_FAIL_FID_INDIR)) {
3949                         struct lu_fid *tfid = &info->oti_fid;
3950
3951                         *tfid = *(const struct lu_fid *)fid;
3952                         tfid->f_ver = ~0;
3953                         rc = __osd_ea_add_rec(info, pobj, cinode, name,
3954                                               (const struct dt_rec *)tfid,
3955                                               hlock, th);
3956                 } else {
3957                         rc = __osd_ea_add_rec(info, pobj, cinode, name, fid,
3958                                               hlock, th);
3959                 }
3960         }
3961         if (hlock != NULL)
3962                 ldiskfs_htree_unlock(hlock);
3963         else
3964                 up_write(&pobj->oo_ext_idx_sem);
3965
3966         return rc;
3967 }
3968
3969 static void
3970 osd_consistency_check(struct osd_thread_info *oti, struct osd_device *dev,
3971                       struct osd_idmap_cache *oic)
3972 {
3973         struct osd_scrub    *scrub = &dev->od_scrub;
3974         struct lu_fid       *fid   = &oic->oic_fid;
3975         struct osd_inode_id *id    = &oti->oti_id;
3976         int                  once  = 0;
3977         int                  rc;
3978         ENTRY;
3979
3980         if (!fid_is_norm(fid) && !fid_is_igif(fid))
3981                 RETURN_EXIT;
3982
3983 again:
3984         rc = osd_oi_lookup(oti, dev, fid, id, OI_CHECK_FLD);
3985         if (rc != 0 && rc != -ENOENT)
3986                 RETURN_EXIT;
3987
3988         if (rc == 0 && osd_id_eq(id, &oic->oic_lid))
3989                 RETURN_EXIT;
3990
3991         if (thread_is_running(&scrub->os_thread)) {
3992                 rc = osd_oii_insert(dev, oic, rc == -ENOENT);
3993                 /* There is race condition between osd_oi_lookup and OI scrub.
3994                  * The OI scrub finished just after osd_oi_lookup() failure.
3995                  * Under such case, it is unnecessary to trigger OI scrub again,
3996                  * but try to call osd_oi_lookup() again. */
3997                 if (unlikely(rc == -EAGAIN))
3998                         goto again;
3999
4000                 RETURN_EXIT;
4001         }
4002
4003         if (!dev->od_noscrub && ++once == 1) {
4004                 rc = osd_scrub_start(dev);
4005                 LCONSOLE_WARN("%.16s: trigger OI scrub by RPC for "DFID
4006                               ", rc = %d [2]\n",
4007                               LDISKFS_SB(osd_sb(dev))->s_es->s_volume_name,
4008                               PFID(fid), rc);
4009                 if (rc == 0)
4010                         goto again;
4011         }
4012
4013         EXIT;
4014 }
4015
4016 static int osd_fail_fid_lookup(struct osd_thread_info *oti,
4017                                struct osd_device *dev,
4018                                struct osd_idmap_cache *oic,
4019                                struct lu_fid *fid, __u32 ino)
4020 {
4021         struct lustre_mdt_attrs *lma   = &oti->oti_mdt_attrs;
4022         struct inode            *inode;
4023         int                      rc;
4024
4025         osd_id_gen(&oic->oic_lid, ino, OSD_OII_NOGEN);
4026         inode = osd_iget(oti, dev, &oic->oic_lid);
4027         if (IS_ERR(inode)) {
4028                 fid_zero(&oic->oic_fid);
4029                 return PTR_ERR(inode);
4030         }
4031
4032         rc = osd_get_lma(oti, inode, &oti->oti_obj_dentry, lma);
4033         iput(inode);
4034         if (rc != 0)
4035                 fid_zero(&oic->oic_fid);
4036         else
4037                 *fid = oic->oic_fid = lma->lma_self_fid;
4038         return rc;
4039 }
4040
4041 int osd_add_oi_cache(struct osd_thread_info *info, struct osd_device *osd,
4042                      struct osd_inode_id *id, const struct lu_fid *fid)
4043 {
4044         CDEBUG(D_INODE, "add "DFID" %u:%u to info %p\n", PFID(fid),
4045                id->oii_ino, id->oii_gen, info);
4046         info->oti_cache.oic_lid = *id;
4047         info->oti_cache.oic_fid = *fid;
4048         info->oti_cache.oic_dev = osd;
4049
4050         return 0;
4051 }
4052
4053 /**
4054  * Calls ->lookup() to find dentry. From dentry get inode and
4055  * read inode's ea to get fid. This is required for  interoperability
4056  * mode (b11826)
4057  *
4058  * \retval   0, on success
4059  * \retval -ve, on error
4060  */
4061 static int osd_ea_lookup_rec(const struct lu_env *env, struct osd_object *obj,
4062                              struct dt_rec *rec, const struct dt_key *key)
4063 {
4064         struct inode               *dir    = obj->oo_inode;
4065         struct dentry              *dentry;
4066         struct ldiskfs_dir_entry_2 *de;
4067         struct buffer_head         *bh;
4068         struct lu_fid              *fid = (struct lu_fid *) rec;
4069         struct htree_lock          *hlock = NULL;
4070         int                         ino;
4071         int                         rc;
4072         ENTRY;
4073
4074         LASSERT(dir->i_op != NULL && dir->i_op->lookup != NULL);
4075
4076         dentry = osd_child_dentry_get(env, obj,
4077                                       (char *)key, strlen((char *)key));
4078
4079         if (obj->oo_hl_head != NULL) {
4080                 hlock = osd_oti_get(env)->oti_hlock;
4081                 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
4082                                    dir, LDISKFS_HLOCK_LOOKUP);
4083         } else {
4084                 down_read(&obj->oo_ext_idx_sem);
4085         }
4086
4087         bh = osd_ldiskfs_find_entry(dir, &dentry->d_name, &de, NULL, hlock);
4088         if (bh) {
4089                 struct osd_thread_info *oti = osd_oti_get(env);
4090                 struct osd_inode_id *id = &oti->oti_id;
4091                 struct osd_idmap_cache *oic = &oti->oti_cache;
4092                 struct osd_device *dev = osd_obj2dev(obj);
4093                 struct osd_scrub *scrub = &dev->od_scrub;
4094                 struct scrub_file *sf = &scrub->os_file;
4095
4096                 ino = le32_to_cpu(de->inode);
4097                 if (OBD_FAIL_CHECK(OBD_FAIL_FID_LOOKUP)) {
4098                         brelse(bh);
4099                         rc = osd_fail_fid_lookup(oti, dev, oic, fid, ino);
4100                         GOTO(out, rc);
4101                 }
4102
4103                 rc = osd_get_fid_from_dentry(de, rec);
4104
4105                 /* done with de, release bh */
4106                 brelse(bh);
4107                 if (rc != 0)
4108                         rc = osd_ea_fid_get(env, obj, ino, fid, id);
4109                 else
4110                         osd_id_gen(id, ino, OSD_OII_NOGEN);
4111                 if (rc != 0) {
4112                         fid_zero(&oic->oic_fid);
4113                         GOTO(out, rc);
4114                 }
4115
4116                 if (osd_remote_fid(env, dev, fid))
4117                         GOTO(out, rc = 0);
4118
4119                 rc = osd_add_oi_cache(osd_oti_get(env), osd_obj2dev(obj), id,
4120                                       fid);
4121                 if (rc != 0)
4122                         GOTO(out, rc);
4123                 if ((scrub->os_pos_current <= ino) &&
4124                     ((sf->sf_flags & SF_INCONSISTENT) ||
4125                      (sf->sf_flags & SF_UPGRADE && fid_is_igif(fid)) ||
4126                      ldiskfs_test_bit(osd_oi_fid2idx(dev, fid),
4127                                       sf->sf_oi_bitmap)))
4128                         osd_consistency_check(oti, dev, oic);
4129         } else {
4130                 rc = -ENOENT;
4131         }
4132
4133         GOTO(out, rc);
4134
4135 out:
4136         if (hlock != NULL)
4137                 ldiskfs_htree_unlock(hlock);
4138         else
4139                 up_read(&obj->oo_ext_idx_sem);
4140         return rc;
4141 }
4142
4143 /**
4144  * Find the osd object for given fid.
4145  *
4146  * \param fid need to find the osd object having this fid
4147  *
4148  * \retval osd_object on success
4149  * \retval        -ve on error
4150  */
4151 struct osd_object *osd_object_find(const struct lu_env *env,
4152                                    struct dt_object *dt,
4153                                    const struct lu_fid *fid)
4154 {
4155         struct lu_device  *ludev = dt->do_lu.lo_dev;
4156         struct osd_object *child = NULL;
4157         struct lu_object  *luch;
4158         struct lu_object  *lo;
4159
4160         /*
4161          * at this point topdev might not exist yet
4162          * (i.e. MGS is preparing profiles). so we can
4163          * not rely on topdev and instead lookup with
4164          * our device passed as topdev. this can't work
4165          * if the object isn't cached yet (as osd doesn't
4166          * allocate lu_header). IOW, the object must be
4167          * in the cache, otherwise lu_object_alloc() crashes
4168          * -bzzz
4169          */
4170         luch = lu_object_find_at(env, ludev, fid, NULL);
4171         if (!IS_ERR(luch)) {
4172                 if (lu_object_exists(luch)) {
4173                         lo = lu_object_locate(luch->lo_header, ludev->ld_type);
4174                         if (lo != NULL)
4175                                 child = osd_obj(lo);
4176                         else
4177                                 LU_OBJECT_DEBUG(D_ERROR, env, luch,
4178                                                 "lu_object can't be located"
4179                                                 DFID"\n", PFID(fid));
4180
4181                         if (child == NULL) {
4182                                 lu_object_put(env, luch);
4183                                 CERROR("Unable to get osd_object\n");
4184                                 child = ERR_PTR(-ENOENT);
4185                         }
4186                 } else {
4187                         LU_OBJECT_DEBUG(D_ERROR, env, luch,
4188                                         "lu_object does not exists "DFID"\n",
4189                                         PFID(fid));
4190                         lu_object_put(env, luch);
4191                         child = ERR_PTR(-ENOENT);
4192                 }
4193         } else {
4194                 child = ERR_CAST(luch);
4195         }
4196
4197         return child;
4198 }
4199
4200 /**
4201  * Put the osd object once done with it.
4202  *
4203  * \param obj osd object that needs to be put
4204  */
4205 static inline void osd_object_put(const struct lu_env *env,
4206                                   struct osd_object *obj)
4207 {
4208         lu_object_put(env, &obj->oo_dt.do_lu);
4209 }
4210
4211 static int osd_index_declare_ea_insert(const struct lu_env *env,
4212                                        struct dt_object *dt,
4213                                        const struct dt_rec *rec,
4214                                        const struct dt_key *key,
4215                                        struct thandle *handle)
4216 {
4217         struct osd_thandle      *oh;
4218         struct osd_device       *osd   = osd_dev(dt->do_lu.lo_dev);
4219         struct lu_fid           *fid = (struct lu_fid *)rec;
4220         int                     rc;
4221         ENTRY;
4222
4223         LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
4224         LASSERT(handle != NULL);
4225
4226         oh = container_of0(handle, struct osd_thandle, ot_super);
4227         LASSERT(oh->ot_handle == NULL);
4228
4229         osd_trans_declare_op(env, oh, OSD_OT_INSERT,
4230                              osd_dto_credits_noquota[DTO_INDEX_INSERT]);
4231
4232         if (osd_dt_obj(dt)->oo_inode == NULL) {
4233                 const char *name  = (const char *)key;
4234                 /* Object is not being created yet. Only happens when
4235                  *     1. declare directory create
4236                  *     2. declare insert .
4237                  *     3. declare insert ..
4238                  */
4239                 LASSERT(strcmp(name, dotdot) == 0 || strcmp(name, dot) == 0);
4240         } else {
4241                 struct inode *inode = osd_dt_obj(dt)->oo_inode;
4242
4243                 /* We ignore block quota on meta pool (MDTs), so needn't
4244                  * calculate how many blocks will be consumed by this index
4245                  * insert */
4246                 rc = osd_declare_inode_qid(env, inode->i_uid, inode->i_gid, 0,
4247                                            oh, osd_dt_obj(dt), true, NULL,
4248                                            false);
4249         }
4250
4251         if (fid == NULL)
4252                 RETURN(0);
4253
4254         rc = osd_remote_fid(env, osd, fid);
4255         if (rc <= 0)
4256                 RETURN(rc);
4257
4258         rc = 0;
4259
4260         osd_trans_declare_op(env, oh, OSD_OT_CREATE,
4261                              osd_dto_credits_noquota[DTO_OBJECT_CREATE]);
4262         osd_trans_declare_op(env, oh, OSD_OT_INSERT,
4263                              osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1);
4264         osd_trans_declare_op(env, oh, OSD_OT_INSERT,
4265                              osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1);
4266
4267         RETURN(rc);
4268 }
4269
4270 /**
4271  * Index add function for interoperability mode (b11826).
4272  * It will add the directory entry.This entry is needed to
4273  * maintain name->fid mapping.
4274  *
4275  * \param key it is key i.e. file entry to be inserted
4276  * \param rec it is value of given key i.e. fid
4277  *
4278  * \retval   0, on success
4279  * \retval -ve, on error
4280  */
4281 static int osd_index_ea_insert(const struct lu_env *env, struct dt_object *dt,
4282                                const struct dt_rec *rec,
4283                                const struct dt_key *key, struct thandle *th,
4284                                struct lustre_capa *capa, int ignore_quota)
4285 {
4286         struct osd_object       *obj = osd_dt_obj(dt);
4287         struct osd_device       *osd = osd_dev(dt->do_lu.lo_dev);
4288         struct lu_fid           *fid = (struct lu_fid *) rec;
4289         const char              *name = (const char *)key;
4290         struct osd_thread_info  *oti   = osd_oti_get(env);
4291         struct osd_inode_id     *id    = &oti->oti_id;
4292         struct inode            *child_inode = NULL;
4293         struct osd_object       *child = NULL;
4294         int                     rc;
4295         ENTRY;
4296
4297         LASSERT(osd_invariant(obj));
4298         LASSERT(dt_object_exists(dt) && !dt_object_remote(dt));
4299         LASSERT(th != NULL);
4300
4301         osd_trans_exec_op(env, th, OSD_OT_INSERT);
4302
4303         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
4304                 RETURN(-EACCES);
4305
4306         LASSERTF(fid_is_sane(fid), "fid"DFID" is insane!", PFID(fid));
4307
4308         rc = osd_remote_fid(env, osd, fid);
4309         if (rc < 0) {
4310                 CERROR("%s: Can not find object "DFID" rc %d\n",
4311                        osd_name(osd), PFID(fid), rc);
4312                 RETURN(rc);
4313         }
4314
4315         if (rc == 1) {
4316                 /* Insert remote entry */
4317                 if (strcmp(name, dotdot) == 0 && strlen(name) == 2) {
4318                         struct osd_mdobj_map    *omm = osd->od_mdt_map;
4319                         struct osd_thandle      *oh;
4320
4321                         /* If parent on remote MDT, we need put this object
4322                          * under AGENT */
4323                         oh = container_of(th, typeof(*oh), ot_super);
4324                         rc = osd_add_to_remote_parent(env, osd, obj, oh);
4325                         if (rc != 0) {
4326                                 CERROR("%s: add "DFID" error: rc = %d\n",
4327                                        osd_name(osd),
4328                                        PFID(lu_object_fid(&dt->do_lu)), rc);
4329                                 RETURN(rc);
4330                         }
4331
4332                         child_inode = igrab(omm->omm_remote_parent->d_inode);
4333                 } else {
4334                         child_inode = osd_create_local_agent_inode(env, osd,
4335                                                                    obj, fid,
4336                                                                    th);
4337                         if (IS_ERR(child_inode))
4338                                 RETURN(PTR_ERR(child_inode));
4339                 }
4340         } else {
4341                 /* Insert local entry */
4342                 child = osd_object_find(env, dt, fid);
4343                 if (IS_ERR(child)) {
4344                         CERROR("%s: Can not find object "DFID"%u:%u: rc = %d\n",
4345                                osd_name(osd), PFID(fid),
4346                                id->oii_ino, id->oii_gen,
4347                                (int)PTR_ERR(child));
4348                         RETURN(PTR_ERR(child));
4349                 }
4350                 child_inode = igrab(child->oo_inode);
4351         }
4352
4353         rc = osd_ea_add_rec(env, obj, child_inode, name, rec, th);
4354
4355         iput(child_inode);
4356         if (child != NULL)
4357                 osd_object_put(env, child);
4358         LASSERT(osd_invariant(obj));
4359         RETURN(rc);
4360 }
4361
4362 /**
4363  *  Initialize osd Iterator for given osd index object.
4364  *
4365  *  \param  dt      osd index object
4366  */
4367
4368 static struct dt_it *osd_it_iam_init(const struct lu_env *env,
4369                                      struct dt_object *dt,
4370                                      __u32 unused,
4371                                      struct lustre_capa *capa)
4372 {
4373         struct osd_it_iam      *it;
4374         struct osd_thread_info *oti = osd_oti_get(env);
4375         struct osd_object      *obj = osd_dt_obj(dt);
4376         struct lu_object       *lo  = &dt->do_lu;
4377         struct iam_path_descr  *ipd;
4378         struct iam_container   *bag = &obj->oo_dir->od_container;
4379
4380         LASSERT(lu_object_exists(lo));
4381
4382         if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
4383                 return ERR_PTR(-EACCES);
4384
4385         it = &oti->oti_it;
4386         ipd = osd_it_ipd_get(env, bag);
4387         if (likely(ipd != NULL)) {
4388                 it->oi_obj = obj;
4389                 it->oi_ipd = ipd;
4390                 lu_object_get(lo);
4391                 iam_it_init(&it->oi_it, bag, IAM_IT_MOVE, ipd);
4392                 return (struct dt_it *)it;
4393         }
4394         return ERR_PTR(-ENOMEM);
4395 }
4396
4397 /**
4398  * free given Iterator.
4399  */
4400
4401 static void osd_it_iam_fini(const struct lu_env *env, struct dt_it *di)
4402 {
4403         struct osd_it_iam *it = (struct osd_it_iam *)di;
4404         struct osd_object *obj = it->oi_obj;
4405
4406         iam_it_fini(&it->oi_it);
4407         osd_ipd_put(env, &obj->oo_dir->od_container, it->oi_ipd);
4408         lu_object_put(env, &obj->oo_dt.do_lu);
4409 }
4410
4411 /**
4412  *  Move Iterator to record specified by \a key
4413  *
4414  *  \param  di      osd iterator
4415  *  \param  key     key for index
4416  *
4417  *  \retval +ve  di points to record with least key not larger than key
4418  *  \retval  0   di points to exact matched key
4419  *  \retval -ve  failure
4420  */
4421
4422 static int osd_it_iam_get(const struct lu_env *env,
4423                           struct dt_it *di, const struct dt_key *key)
4424 {
4425         struct osd_thread_info  *oti = osd_oti_get(env);
4426         struct osd_it_iam       *it = (struct osd_it_iam *)di;
4427
4428         if (fid_is_quota(lu_object_fid(&it->oi_obj->oo_dt.do_lu))) {
4429                 /* swab quota uid/gid */
4430                 oti->oti_quota_id = cpu_to_le64(*((__u64 *)key));
4431                 key = (struct dt_key *)&oti->oti_quota_id;
4432         }
4433
4434         return iam_it_get(&it->oi_it, (const struct iam_key *)key);
4435 }
4436
4437 /**
4438  *  Release Iterator
4439  *
4440  *  \param  di      osd iterator
4441  */
4442 static void osd_it_iam_put(const struct lu_env *env, struct dt_it *di)
4443 {
4444         struct osd_it_iam *it = (struct osd_it_iam *)di;
4445
4446         iam_it_put(&it->oi_it);
4447 }
4448
4449 /**
4450  *  Move iterator by one record
4451  *
4452  *  \param  di      osd iterator
4453  *
4454  *  \retval +1   end of container reached
4455  *  \retval  0   success
4456  *  \retval -ve  failure
4457  */
4458
4459 static int osd_it_iam_next(const struct lu_env *env, struct dt_it *di)
4460 {
4461         struct osd_it_iam *it = (struct osd_it_iam *)di;
4462
4463         return iam_it_next(&it->oi_it);
4464 }
4465
4466 /**
4467  * Return pointer to the key under iterator.
4468  */
4469
4470 static struct dt_key *osd_it_iam_key(const struct lu_env *env,
4471                                  const struct dt_it *di)
4472 {
4473         struct osd_thread_info *oti = osd_oti_get(env);
4474         struct osd_it_iam      *it = (struct osd_it_iam *)di;
4475         struct osd_object      *obj = it->oi_obj;
4476         struct dt_key          *key;
4477
4478         key = (struct dt_key *)iam_it_key_get(&it->oi_it);
4479
4480         if (!IS_ERR(key) && fid_is_quota(lu_object_fid(&obj->oo_dt.do_lu))) {
4481                 /* swab quota uid/gid */
4482                 oti->oti_quota_id = le64_to_cpu(*((__u64 *)key));
4483                 key = (struct dt_key *)&oti->oti_quota_id;
4484         }
4485
4486         return key;
4487 }
4488
4489 /**
4490  * Return size of key under iterator (in bytes)
4491  */
4492
4493 static int osd_it_iam_key_size(const struct lu_env *env, const struct dt_it *di)
4494 {
4495         struct osd_it_iam *it = (struct osd_it_iam *)di;
4496
4497         return iam_it_key_size(&it->oi_it);
4498 }
4499
4500 static inline void
4501 osd_it_append_attrs(struct lu_dirent *ent, int len, __u16 type)
4502 {
4503         /* check if file type is required */
4504         if (ent->lde_attrs & LUDA_TYPE) {
4505                 struct luda_type *lt;
4506                 int align = sizeof(*lt) - 1;
4507
4508                 len = (len + align) & ~align;
4509                 lt = (struct luda_type *)(ent->lde_name + len);
4510                 lt->lt_type = cpu_to_le16(DTTOIF(type));
4511         }
4512
4513         ent->lde_attrs = cpu_to_le32(ent->lde_attrs);
4514 }
4515
4516 /**
4517  * build lu direct from backend fs dirent.
4518  */
4519
4520 static inline void
4521 osd_it_pack_dirent(struct lu_dirent *ent, struct lu_fid *fid, __u64 offset,
4522                    char *name, __u16 namelen, __u16 type, __u32 attr)
4523 {
4524         ent->lde_attrs = attr | LUDA_FID;
4525         fid_cpu_to_le(&ent->lde_fid, fid);
4526
4527         ent->lde_hash = cpu_to_le64(offset);
4528         ent->lde_reclen = cpu_to_le16(lu_dirent_calc_size(namelen, attr));
4529
4530         strncpy(ent->lde_name, name, namelen);
4531         ent->lde_name[namelen] = '\0';
4532         ent->lde_namelen = cpu_to_le16(namelen);
4533
4534         /* append lustre attributes */
4535         osd_it_append_attrs(ent, namelen, type);
4536 }
4537
4538 /**
4539  * Return pointer to the record under iterator.
4540  */
4541 static int osd_it_iam_rec(const struct lu_env *env,
4542                           const struct dt_it *di,
4543                           struct dt_rec *dtrec, __u32 attr)
4544 {
4545         struct osd_it_iam      *it   = (struct osd_it_iam *)di;
4546         struct osd_thread_info *info = osd_oti_get(env);
4547         ENTRY;
4548
4549         if (S_ISDIR(it->oi_obj->oo_inode->i_mode)) {
4550                 const struct osd_fid_pack *rec;
4551                 struct lu_fid             *fid = &info->oti_fid;
4552                 struct lu_dirent          *lde = (struct lu_dirent *)dtrec;
4553                 char                      *name;
4554                 int                        namelen;
4555                 __u64                      hash;
4556                 int                        rc;
4557
4558                 name = (char *)iam_it_key_get(&it->oi_it);
4559                 if (IS_ERR(name))
4560                         RETURN(PTR_ERR(name));
4561
4562                 namelen = iam_it_key_size(&it->oi_it);
4563
4564                 rec = (const struct osd_fid_pack *)iam_it_rec_get(&it->oi_it);
4565                 if (IS_ERR(rec))
4566                         RETURN(PTR_ERR(rec));
4567
4568                 rc = osd_fid_unpack(fid, rec);
4569                 if (rc)
4570                         RETURN(rc);
4571
4572                 hash = iam_it_store(&it->oi_it);
4573
4574                 /* IAM does not store object type in IAM index (dir) */
4575                 osd_it_pack_dirent(lde, fid, hash, name, namelen,
4576                                    0, LUDA_FID);
4577         } else if (fid_is_quota(lu_object_fid(&it->oi_obj->oo_dt.do_lu))) {
4578                 iam_reccpy(&it->oi_it.ii_path.ip_leaf,
4579                            (struct iam_rec *)dtrec);
4580                 osd_quota_unpack(it->oi_obj, dtrec);
4581         } else {
4582                 iam_reccpy(&it->oi_it.ii_path.ip_leaf,
4583                            (struct iam_rec *)dtrec);
4584         }
4585
4586         RETURN(0);
4587 }
4588
4589 /**
4590  * Returns cookie for current Iterator position.
4591  */
4592 static __u64 osd_it_iam_store(const struct lu_env *env, const struct dt_it *di)
4593 {
4594         struct osd_it_iam *it = (struct osd_it_iam *)di;
4595
4596         return iam_it_store(&it->oi_it);
4597 }
4598
4599 /**
4600  * Restore iterator from cookie.
4601  *
4602  * \param  di      osd iterator
4603  * \param  hash    Iterator location cookie
4604  *
4605  * \retval +ve  di points to record with least key not larger than key.
4606  * \retval  0   di points to exact matched key
4607  * \retval -ve  failure
4608  */
4609
4610 static int osd_it_iam_load(const struct lu_env *env,
4611                            const struct dt_it *di, __u64 hash)
4612 {
4613         struct osd_it_iam *it = (struct osd_it_iam *)di;
4614
4615         return iam_it_load(&it->oi_it, hash);
4616 }
4617
4618 static const struct dt_index_operations osd_index_iam_ops = {
4619         .dio_lookup         = osd_index_iam_lookup,
4620         .dio_declare_insert = osd_index_declare_iam_insert,
4621         .dio_insert         = osd_index_iam_insert,
4622         .dio_declare_delete = osd_index_declare_iam_delete,
4623         .dio_delete         = osd_index_iam_delete,
4624         .dio_it     = {
4625                 .init     = osd_it_iam_init,
4626                 .fini     = osd_it_iam_fini,
4627                 .get      = osd_it_iam_get,
4628                 .put      = osd_it_iam_put,
4629                 .next     = osd_it_iam_next,
4630                 .key      = osd_it_iam_key,
4631                 .key_size = osd_it_iam_key_size,
4632                 .rec      = osd_it_iam_rec,
4633                 .store    = osd_it_iam_store,
4634                 .load     = osd_it_iam_load
4635         }
4636 };
4637
4638
4639 /**
4640  * Creates or initializes iterator context.
4641  *
4642  * \retval struct osd_it_ea, iterator structure on success
4643  *
4644  */
4645 static struct dt_it *osd_it_ea_init(const struct lu_env *env,
4646                                     struct dt_object *dt,
4647                                     __u32 attr,
4648                                     struct lustre_capa *capa)
4649 {
4650         struct osd_object       *obj  = osd_dt_obj(dt);
4651         struct osd_thread_info  *info = osd_oti_get(env);
4652         struct osd_it_ea        *it   = &info->oti_it_ea;
4653         struct file             *file = &it->oie_file;
4654         struct lu_object        *lo   = &dt->do_lu;
4655         struct dentry           *obj_dentry = &info->oti_it_dentry;
4656         ENTRY;
4657         LASSERT(lu_object_exists(lo));
4658
4659         obj_dentry->d_inode = obj->oo_inode;
4660         obj_dentry->d_sb = osd_sb(osd_obj2dev(obj));
4661         obj_dentry->d_name.hash = 0;
4662
4663         it->oie_rd_dirent       = 0;
4664         it->oie_it_dirent       = 0;
4665         it->oie_dirent          = NULL;
4666         it->oie_buf             = info->oti_it_ea_buf;
4667         it->oie_obj             = obj;
4668
4669         /* Reset the "file" totally to avoid to reuse any old value from
4670          * former readdir handling, the "file->f_pos" should be zero. */
4671         memset(file, 0, sizeof(*file));
4672         /* Only FMODE_64BITHASH or FMODE_32BITHASH should be set, NOT both. */
4673         if (attr & LUDA_64BITHASH)
4674                 file->f_mode    = FMODE_64BITHASH;
4675         else
4676                 file->f_mode    = FMODE_32BITHASH;
4677         file->f_dentry          = obj_dentry;
4678         file->f_mapping         = obj->oo_inode->i_mapping;
4679         file->f_op              = obj->oo_inode->i_fop;
4680         lu_object_get(lo);
4681         RETURN((struct dt_it *) it);
4682 }
4683
4684 /**
4685  * Destroy or finishes iterator context.
4686  *
4687  * \param di iterator structure to be destroyed
4688  */
4689 static void osd_it_ea_fini(const struct lu_env *env, struct dt_it *di)
4690 {
4691         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
4692         struct osd_object    *obj  = it->oie_obj;
4693         struct inode       *inode  = obj->oo_inode;
4694
4695         ENTRY;
4696         it->oie_file.f_op->release(inode, &it->oie_file);
4697         lu_object_put(env, &obj->oo_dt.do_lu);
4698         EXIT;
4699 }
4700
4701 /**
4702  * It position the iterator at given key, so that next lookup continues from
4703  * that key Or it is similar to dio_it->load() but based on a key,
4704  * rather than file position.
4705  *
4706  * As a special convention, osd_it_ea_get(env, di, "") has to rewind iterator
4707  * to the beginning.
4708  *
4709  * TODO: Presently return +1 considering it is only used by mdd_dir_is_empty().
4710  */
4711 static int osd_it_ea_get(const struct lu_env *env,
4712                          struct dt_it *di, const struct dt_key *key)
4713 {
4714         struct osd_it_ea     *it   = (struct osd_it_ea *)di;
4715
4716         ENTRY;
4717         LASSERT(((const char *)key)[0] == '\0');
4718         it->oie_file.f_pos      = 0;
4719         it->oie_rd_dirent       = 0;
4720         it->oie_it_dirent       = 0;
4721         it->oie_dirent          = NULL;
4722
4723         RETURN(+1);
4724 }
4725
4726 /**
4727  * Does nothing
4728  */
4729 static void osd_it_ea_put(const struct lu_env *env, struct dt_it *di)
4730 {
4731 }
4732
4733 /**
4734  * It is called internally by ->readdir(). It fills the
4735  * iterator's in-memory data structure with required
4736  * information i.e. name, namelen, rec_size etc.
4737  *
4738  * \param buf in which information to be filled in.
4739  * \param name name of the file in given dir
4740  *
4741  * \retval 0 on success
4742  * \retval 1 on buffer full
4743  */
4744 static int osd_ldiskfs_filldir(char *buf, const char *name, int namelen,
4745                                loff_t offset, __u64 ino,
4746                                unsigned d_type)
4747 {
4748         struct osd_it_ea        *it   = (struct osd_it_ea *)buf;
4749         struct osd_object       *obj  = it->oie_obj;
4750         struct osd_it_ea_dirent *ent  = it->oie_dirent;
4751         struct lu_fid           *fid  = &ent->oied_fid;
4752         struct osd_fid_pack     *rec;
4753         ENTRY;
4754
4755         /* this should never happen */
4756         if (unlikely(namelen == 0 || namelen > LDISKFS_NAME_LEN)) {
4757                 CERROR("ldiskfs return invalid namelen %d\n", namelen);
4758                 RETURN(-EIO);
4759         }
4760
4761         if ((void *) ent - it->oie_buf + sizeof(*ent) + namelen >
4762             OSD_IT_EA_BUFSIZE)
4763                 RETURN(1);
4764
4765         /* "." is just the object itself. */
4766         if (namelen == 1 && name[0] == '.') {
4767                 *fid = obj->oo_dt.do_lu.lo_header->loh_fid;
4768         } else if (d_type & LDISKFS_DIRENT_LUFID) {
4769                 rec = (struct osd_fid_pack*) (name + namelen + 1);
4770                 if (osd_fid_unpack(fid, rec) != 0)
4771                         fid_zero(fid);
4772         } else {
4773                 fid_zero(fid);
4774         }
4775         d_type &= ~LDISKFS_DIRENT_LUFID;
4776
4777         /* NOT export local root. */
4778         if (unlikely(osd_sb(osd_obj2dev(obj))->s_root->d_inode->i_ino == ino)) {
4779                 ino = obj->oo_inode->i_ino;
4780                 *fid = obj->oo_dt.do_lu.lo_header->loh_fid;
4781         }
4782
4783         ent->oied_ino     = ino;
4784         ent->oied_off     = offset;
4785         ent->oied_namelen = namelen;
4786         ent->oied_type    = d_type;
4787
4788         memcpy(ent->oied_name, name, namelen);
4789
4790         it->oie_rd_dirent++;
4791         it->oie_dirent = (void *) ent + cfs_size_round(sizeof(*ent) + namelen);
4792         RETURN(0);
4793 }
4794
4795 /**
4796  * Calls ->readdir() to load a directory entry at a time
4797  * and stored it in iterator's in-memory data structure.
4798  *
4799  * \param di iterator's in memory structure
4800  *
4801  * \retval   0 on success
4802  * \retval -ve on error
4803  */
4804 static int osd_ldiskfs_it_fill(const struct lu_env *env,
4805                                const struct dt_it *di)
4806 {
4807         struct osd_it_ea   *it    = (struct osd_it_ea *)di;
4808         struct osd_object  *obj   = it->oie_obj;
4809         struct inode       *inode = obj->oo_inode;
4810         struct htree_lock  *hlock = NULL;
4811         int                 result = 0;
4812
4813         ENTRY;
4814         it->oie_dirent = it->oie_buf;
4815         it->oie_rd_dirent = 0;
4816
4817         if (obj->oo_hl_head != NULL) {
4818                 hlock = osd_oti_get(env)->oti_hlock;
4819                 ldiskfs_htree_lock(hlock, obj->oo_hl_head,
4820                                    inode, LDISKFS_HLOCK_READDIR);
4821         } else {
4822                 down_read(&obj->oo_ext_idx_sem);
4823         }
4824
4825         result = inode->i_fop->readdir(&it->oie_file, it,
4826                                        (filldir_t) osd_ldiskfs_filldir);
4827
4828         if (hlock != NULL)
4829                 ldiskfs_htree_unlock(hlock);
4830         else
4831                 up_read(&obj->oo_ext_idx_sem);
4832
4833         if (it->oie_rd_dirent == 0) {
4834                 result = -EIO;
4835         } else {
4836                 it->oie_dirent = it->oie_buf;
4837                 it->oie_it_dirent = 1;
4838         }
4839
4840         RETURN(result);
4841 }
4842
4843 /**
4844  * It calls osd_ldiskfs_it_fill() which will use ->readdir()
4845  * to load a directory entry at a time and stored it in
4846  * iterator's in-memory data structure.
4847  *
4848  * \param di iterator's in memory structure
4849  *
4850  * \retval +ve iterator reached to end
4851  * \retval   0 iterator not reached to end
4852  * \retval -ve on error
4853  */
4854 static int osd_it_ea_next(const struct lu_env *env, struct dt_it *di)
4855 {
4856         struct osd_it_ea *it = (struct osd_it_ea *)di;
4857         int rc;
4858
4859         ENTRY;
4860
4861         if (it->oie_it_dirent < it->oie_rd_dirent) {
4862                 it->oie_dirent =
4863                         (void *) it->oie_dirent +
4864                         cfs_size_round(sizeof(struct osd_it_ea_dirent) +
4865                                        it->oie_dirent->oied_namelen);
4866                 it->oie_it_dirent++;
4867                 RETURN(0);
4868         } else {
4869                 if (it->oie_file.f_pos == ldiskfs_get_htree_eof(&it->oie_file))
4870                         rc = +1;
4871                 else
4872                         rc = osd_ldiskfs_it_fill(env, di);
4873         }
4874
4875         RETURN(rc);
4876 }
4877
4878 /**
4879  * Returns the key at current position from iterator's in memory structure.
4880  *
4881  * \param di iterator's in memory structure
4882  *
4883  * \retval key i.e. struct dt_key on success
4884  */
4885 static struct dt_key *osd_it_ea_key(const struct lu_env *env,
4886                                     const struct dt_it *di)
4887 {
4888         struct osd_it_ea *it = (struct osd_it_ea *)di;
4889
4890         return (struct dt_key *)it->oie_dirent->oied_name;
4891 }
4892
4893 /**
4894  * Returns the key's size at current position from iterator's in memory structure.
4895  *
4896  * \param di iterator's in memory structure
4897  *
4898  * \retval key_size i.e. struct dt_key on success
4899  */
4900 static int osd_it_ea_key_size(const struct lu_env *env, const struct dt_it *di)
4901 {
4902         struct osd_it_ea *it = (struct osd_it_ea *)di;
4903
4904         return it->oie_dirent->oied_namelen;
4905 }
4906
4907 static int
4908 osd_dirent_update(handle_t *jh, struct super_block *sb,
4909                   struct osd_it_ea_dirent *ent, struct lu_fid *fid,
4910                   struct buffer_head *bh, struct ldiskfs_dir_entry_2 *de)
4911 {
4912         struct osd_fid_pack *rec;
4913         int                  rc;
4914         ENTRY;
4915
4916         LASSERT(de->file_type & LDISKFS_DIRENT_LUFID);
4917         LASSERT(de->rec_len >= de->name_len + sizeof(struct osd_fid_pack));
4918
4919         rc = ldiskfs_journal_get_write_access(jh, bh);
4920         if (rc != 0) {
4921                 CERROR("%.16s: fail to write access for update dirent: "
4922                        "name = %.*s, rc = %d\n",
4923                        LDISKFS_SB(sb)->s_es->s_volume_name,
4924                        ent->oied_namelen, ent->oied_name, rc);
4925                 RETURN(rc);
4926         }
4927
4928         rec = (struct osd_fid_pack *)(de->name + de->name_len + 1);
4929         fid_cpu_to_be((struct lu_fid *)rec->fp_area, fid);
4930         rc = ldiskfs_journal_dirty_metadata(jh, bh);
4931         if (rc != 0)
4932                 CERROR("%.16s: fail to dirty metadata for update dirent: "
4933                        "name = %.*s, rc = %d\n",
4934                        LDISKFS_SB(sb)->s_es->s_volume_name,
4935                        ent->oied_namelen, ent->oied_name, rc);
4936
4937         RETURN(rc);
4938 }
4939
4940 static inline int
4941 osd_dirent_has_space(__u16 reclen, __u16 namelen, unsigned blocksize)
4942 {
4943         if (ldiskfs_rec_len_from_disk(reclen, blocksize) >=
4944             __LDISKFS_DIR_REC_LEN(namelen + 1 + sizeof(struct osd_fid_pack)))
4945                 return 1;
4946         else
4947                 return 0;
4948 }
4949
4950 static inline int
4951 osd_dot_dotdot_has_space(struct ldiskfs_dir_entry_2 *de, int dot_dotdot)
4952 {
4953         LASSERTF(dot_dotdot == 1 || dot_dotdot == 2,
4954                  "dot_dotdot = %d\n", dot_dotdot);
4955
4956         if (LDISKFS_DIR_REC_LEN(de) >=
4957             __LDISKFS_DIR_REC_LEN(dot_dotdot + 1 + sizeof(struct osd_fid_pack)))
4958                 return 1;
4959         else
4960                 return 0;
4961 }
4962
4963 static int
4964 osd_dirent_reinsert(const struct lu_env *env, handle_t *jh,
4965                     struct inode *dir, struct inode *inode,
4966                     struct osd_it_ea_dirent *ent, struct lu_fid *fid,
4967                     struct buffer_head *bh, struct ldiskfs_dir_entry_2 *de,
4968                     struct htree_lock *hlock)
4969 {
4970         struct dentry               *dentry;
4971         struct osd_fid_pack         *rec;
4972         struct ldiskfs_dentry_param *ldp;
4973         int                          rc;
4974         ENTRY;
4975
4976         if (!LDISKFS_HAS_INCOMPAT_FEATURE(inode->i_sb,
4977                                           LDISKFS_FEATURE_INCOMPAT_DIRDATA))
4978                 RETURN(0);
4979
4980         /* There is enough space to hold the FID-in-dirent. */
4981         if (osd_dirent_has_space(de->rec_len, ent->oied_namelen,
4982                                  dir->i_sb->s_blocksize)) {
4983                 rc = ldiskfs_journal_get_write_access(jh, bh);
4984                 if (rc != 0) {
4985                         CERROR("%.16s: fail to write access for reinsert "
4986                                "dirent: name = %.*s, rc = %d\n",
4987                                LDISKFS_SB(inode->i_sb)->s_es->s_volume_name,
4988                                ent->oied_namelen, ent->oied_name, rc);
4989                         RETURN(rc);
4990                 }
4991
4992                 de->name[de->name_len] = 0;
4993                 rec = (struct osd_fid_pack *)(de->name + de->name_len + 1);
4994                 rec->fp_len = sizeof(struct lu_fid) + 1;
4995                 fid_cpu_to_be((struct lu_fid *)rec->fp_area, fid);
4996                 de->file_type |= LDISKFS_DIRENT_LUFID;
4997
4998                 rc = ldiskfs_journal_dirty_metadata(jh, bh);
4999                 if (rc != 0)
5000                         CERROR("%.16s: fail to dirty metadata for reinsert "
5001                                "dirent: name = %.*s, rc = %d\n",
5002                                LDISKFS_SB(inode->i_sb)->s_es->s_volume_name,
5003                                ent->oied_namelen, ent->oied_name, rc);
5004
5005                 RETURN(rc);
5006         }
5007
5008         rc = ldiskfs_delete_entry(jh, dir, de, bh);
5009         if (rc != 0) {
5010                 CERROR("%.16s: fail to delete entry for reinsert dirent: "
5011                        "name = %.*s, rc = %d\n",
5012                        LDISKFS_SB(inode->i_sb)->s_es->s_volume_name,
5013                        ent->oied_namelen, ent->oied_name, rc);
5014                 RETURN(rc);
5015         }
5016
5017         dentry = osd_child_dentry_by_inode(env, dir, ent->oied_name,
5018                                            ent->oied_namelen);
5019         ldp = (struct ldiskfs_dentry_param *)osd_oti_get(env)->oti_ldp;
5020         osd_get_ldiskfs_dirent_param(ldp, (const struct dt_rec *)fid);
5021         dentry->d_fsdata = (void *)ldp;
5022         ll_vfs_dq_init(dir);
5023         rc = osd_ldiskfs_add_entry(jh, dentry, inode, hlock);
5024         /* It is too bad, we cannot reinsert the name entry back.
5025          * That means we lose it! */
5026         if (rc != 0)
5027                 CERROR("%.16s: fail to insert entry for reinsert dirent: "
5028                        "name = %.*s, rc = %d\n",
5029                        LDISKFS_SB(inode->i_sb)->s_es->s_volume_name,
5030                        ent->oied_namelen, ent->oied_name, rc);
5031
5032         RETURN(rc);
5033 }
5034
5035 static int
5036 osd_dirent_check_repair(const struct lu_env *env, struct osd_object *obj,
5037                         struct osd_it_ea *it, struct lu_fid *fid,
5038                         struct osd_inode_id *id, __u32 *attr)
5039 {
5040         struct osd_thread_info     *info        = osd_oti_get(env);
5041         struct lustre_mdt_attrs    *lma         = &info->oti_mdt_attrs;
5042         struct osd_device          *dev         = osd_obj2dev(obj);
5043         struct super_block         *sb          = osd_sb(dev);
5044         const char                 *devname     =
5045                                         LDISKFS_SB(sb)->s_es->s_volume_name;
5046         struct osd_it_ea_dirent    *ent         = it->oie_dirent;
5047         struct inode               *dir         = obj->oo_inode;
5048         struct htree_lock          *hlock       = NULL;
5049         struct buffer_head         *bh          = NULL;
5050         handle_t                   *jh          = NULL;
5051         struct ldiskfs_dir_entry_2 *de;
5052         struct dentry              *dentry;
5053         struct inode               *inode;
5054         int                         credits;
5055         int                         rc;
5056         int                         dot_dotdot  = 0;
5057         bool                        dirty       = false;
5058         ENTRY;
5059
5060         if (ent->oied_name[0] == '.') {
5061                 if (ent->oied_namelen == 1)
5062                         dot_dotdot = 1;
5063                 else if (ent->oied_namelen == 2 && ent->oied_name[1] == '.')
5064                         dot_dotdot = 2;
5065         }
5066
5067         dentry = osd_child_dentry_get(env, obj, ent->oied_name,
5068                                       ent->oied_namelen);
5069
5070         /* We need to ensure that the name entry is still valid.
5071          * Because it may be removed or renamed by other already.
5072          *
5073          * The unlink or rename operation will start journal before PDO lock,
5074          * so to avoid deadlock, here we need to start journal handle before
5075          * related PDO lock also. But because we do not know whether there
5076          * will be something to be repaired before PDO lock, we just start
5077          * journal without conditions.
5078          *
5079          * We may need to remove the name entry firstly, then insert back.
5080          * One credit is for user quota file update.
5081          * One credit is for group quota file update.
5082          * Two credits are for dirty inode. */
5083         credits = osd_dto_credits_noquota[DTO_INDEX_DELETE] +
5084                   osd_dto_credits_noquota[DTO_INDEX_INSERT] + 1 + 1 + 2;
5085
5086 again:
5087         if (dev->od_dirent_journal) {
5088                 jh = osd_journal_start_sb(sb, LDISKFS_HT_MISC, credits);
5089                 if (IS_ERR(jh)) {
5090                         rc = PTR_ERR(jh);
5091                         CERROR("%.16s: fail to start trans for dirent "
5092                                "check_repair: credits %d, name %.*s, rc %d\n",
5093                                devname, credits, ent->oied_namelen,
5094                                ent->oied_name, rc);
5095                         RETURN(rc);
5096                 }
5097
5098                 if (obj->oo_hl_head != NULL) {
5099                         hlock = osd_oti_get(env)->oti_hlock;
5100                         /* "0" means exclusive lock for the whole directory.
5101                          * We need to prevent others access such name entry
5102                          * during the delete + insert. Neither HLOCK_ADD nor
5103                          * HLOCK_DEL cannot guarantee the atomicity. */
5104                         ldiskfs_htree_lock(hlock, obj->oo_hl_head, dir, 0);
5105                 } else {
5106                         down_write(&obj->oo_ext_idx_sem);
5107                 }
5108         } else {
5109                 if (obj->oo_hl_head != NULL) {
5110                         hlock = osd_oti_get(env)->oti_hlock;
5111                         ldiskfs_htree_lock(hlock, obj->oo_hl_head, dir,
5112                                            LDISKFS_HLOCK_LOOKUP);
5113                 } else {
5114                         down_read(&obj->oo_ext_idx_sem);
5115                 }
5116         }
5117
5118         bh = osd_ldiskfs_find_entry(dir, &dentry->d_name, &de, NULL, hlock);
5119         /* For dot/dotdot entry, if there is not enough space to hold the
5120          * FID-in-dirent, just keep them there. It only happens when the
5121          * device upgraded from 1.8 or restored from MDT file-level backup.
5122          * For the whole directory, only dot/dotdot entry have no FID-in-dirent
5123          * and needs to get FID from LMA when readdir, it will not affect the
5124          * performance much. */
5125         if ((bh == NULL) || (le32_to_cpu(de->inode) != ent->oied_ino) ||
5126             (dot_dotdot != 0 && !osd_dot_dotdot_has_space(de, dot_dotdot))) {
5127                 *attr |= LUDA_IGNORE;
5128                 GOTO(out_journal, rc = 0);
5129         }
5130
5131         osd_id_gen(id, ent->oied_ino, OSD_OII_NOGEN);
5132         inode = osd_iget(info, dev, id);
5133         if (IS_ERR(inode)) {
5134                 rc = PTR_ERR(inode);
5135                 if (rc == -ENOENT || rc == -ESTALE) {
5136                         *attr |= LUDA_IGNORE;
5137                         rc = 0;
5138                 }
5139
5140                 GOTO(out_journal, rc);
5141         }
5142
5143         /* skip the REMOTE_PARENT_DIR. */
5144         if (inode == dev->od_mdt_map->omm_remote_parent->d_inode)
5145                 GOTO(out_inode, rc = 0);
5146
5147         rc = osd_get_lma(info, inode, &info->oti_obj_dentry, lma);
5148         if (rc == 0) {
5149                 LASSERT(!(lma->lma_compat & LMAC_NOT_IN_OI));
5150
5151                 if (fid_is_sane(fid)) {
5152                         /* FID-in-dirent is valid. */
5153                         if (lu_fid_eq(fid, &lma->lma_self_fid))
5154                                 GOTO(out_inode, rc = 0);
5155
5156                         /* Do not repair under dryrun mode. */
5157                         if (*attr & LUDA_VERIFY_DRYRUN) {
5158                                 *attr |= LUDA_REPAIR;
5159                                 GOTO(out_inode, rc = 0);
5160                         }
5161
5162                         if (!dev->od_dirent_journal) {
5163                                 iput(inode);
5164                                 brelse(bh);
5165                                 if (hlock != NULL)
5166                                         ldiskfs_htree_unlock(hlock);
5167                                 else
5168                                         up_read(&obj->oo_ext_idx_sem);
5169                                 dev->od_dirent_journal = 1;
5170                                 goto again;
5171                         }
5172
5173                         *fid = lma->lma_self_fid;
5174                         dirty = true;
5175                         /* Update the FID-in-dirent. */
5176                         rc = osd_dirent_update(jh, sb, ent, fid, bh, de);
5177                         if (rc == 0)
5178                                 *attr |= LUDA_REPAIR;
5179                 } else {
5180                         /* Do not repair under dryrun mode. */
5181                         if (*attr & LUDA_VERIFY_DRYRUN) {
5182                                 *fid = lma->lma_self_fid;
5183                                 *attr |= LUDA_REPAIR;
5184                                 GOTO(out_inode, rc = 0);
5185                         }
5186
5187                         if (!dev->od_dirent_journal) {
5188                                 iput(inode);
5189                                 brelse(bh);
5190                                 if (hlock != NULL)
5191                                         ldiskfs_htree_unlock(hlock);
5192                                 else
5193                                         up_read(&obj->oo_ext_idx_sem);
5194                                 dev->od_dirent_journal = 1;
5195                                 goto again;
5196                         }
5197
5198                         *fid = lma->lma_self_fid;
5199                         dirty = true;
5200                         /* Append the FID-in-dirent. */
5201                         rc = osd_dirent_reinsert(env, jh, dir, inode, ent,
5202                                                  fid, bh, de, hlock);
5203                         if (rc == 0)
5204                                 *attr |= LUDA_REPAIR;
5205                 }
5206         } else if (rc == -ENODATA) {
5207                 /* Do not repair under dryrun mode. */
5208                 if (*attr & LUDA_VERIFY_DRYRUN) {
5209                         if (fid_is_sane(fid)) {
5210                                 *attr |= LUDA_REPAIR;
5211                         } else {
5212                                 lu_igif_build(fid, inode->i_ino,
5213                                               inode->i_generation);
5214                                 *attr |= LUDA_UPGRADE;
5215                         }
5216                         GOTO(out_inode, rc = 0);
5217                 }
5218
5219                 if (!dev->od_dirent_journal) {
5220                         iput(inode);
5221                         brelse(bh);
5222                         if (hlock != NULL)
5223                                 ldiskfs_htree_unlock(hlock);
5224                         else
5225                                 up_read(&obj->oo_ext_idx_sem);
5226                         dev->od_dirent_journal = 1;
5227                         goto again;
5228                 }
5229
5230                 dirty = true;
5231                 if (unlikely(fid_is_sane(fid))) {
5232                         /* FID-in-dirent exists, but FID-in-LMA is lost.
5233                          * Trust the FID-in-dirent, and add FID-in-LMA. */
5234                         rc = osd_ea_fid_set(info, inode, fid, 0, 0);
5235                         if (rc == 0)
5236                                 *attr |= LUDA_REPAIR;
5237                 } else {
5238                         lu_igif_build(fid, inode->i_ino, inode->i_generation);
5239                         /* It is probably IGIF object. Only aappend the
5240                          * FID-in-dirent. OI scrub will process FID-in-LMA. */
5241                         rc = osd_dirent_reinsert(env, jh, dir, inode, ent,
5242                                                  fid, bh, de, hlock);
5243                         if (rc == 0)
5244                                 *attr |= LUDA_UPGRADE;
5245                 }
5246         }
5247
5248         GOTO(out_inode, rc);
5249
5250 out_inode:
5251         iput(inode);
5252
5253 out_journal:
5254         brelse(bh);
5255         if (hlock != NULL) {
5256                 ldiskfs_htree_unlock(hlock);
5257         } else {
5258                 if (dev->od_dirent_journal)
5259                         up_write(&obj->oo_ext_idx_sem);
5260                 else
5261                         up_read(&obj->oo_ext_idx_sem);
5262         }
5263         if (jh != NULL)
5264                 ldiskfs_journal_stop(jh);
5265         if (rc >= 0 && !dirty)
5266                 dev->od_dirent_journal = 0;
5267         return rc;
5268 }
5269
5270 /**
5271  * Returns the value at current position from iterator's in memory structure.
5272  *
5273  * \param di struct osd_it_ea, iterator's in memory structure
5274  * \param attr attr requested for dirent.
5275  * \param lde lustre dirent
5276  *
5277  * \retval   0 no error and \param lde has correct lustre dirent.
5278  * \retval -ve on error
5279  */
5280 static inline int osd_it_ea_rec(const struct lu_env *env,
5281                                 const struct dt_it *di,
5282                                 struct dt_rec *dtrec, __u32 attr)
5283 {
5284         struct osd_it_ea       *it    = (struct osd_it_ea *)di;
5285         struct osd_object      *obj   = it->oie_obj;
5286         struct osd_device      *dev   = osd_obj2dev(obj);
5287         struct osd_scrub       *scrub = &dev->od_scrub;
5288         struct scrub_file      *sf    = &scrub->os_file;
5289         struct osd_thread_info *oti   = osd_oti_get(env);
5290         struct osd_inode_id    *id    = &oti->oti_id;
5291         struct osd_idmap_cache *oic   = &oti->oti_cache;
5292         struct lu_fid          *fid   = &it->oie_dirent->oied_fid;
5293         struct lu_dirent       *lde   = (struct lu_dirent *)dtrec;
5294         __u32                   ino   = it->oie_dirent->oied_ino;
5295         int                     rc    = 0;
5296         ENTRY;
5297
5298         if (attr & LUDA_VERIFY) {
5299                 attr |= LUDA_TYPE;
5300                 if (unlikely(ino == osd_sb(dev)->s_root->d_inode->i_ino)) {
5301                         attr |= LUDA_IGNORE;
5302                         rc = 0;
5303                 } else {
5304                         rc = osd_dirent_check_repair(env, obj, it, fid, id,
5305                                                      &attr);
5306                 }
5307         } else {
5308                 attr &= ~LU_DIRENT_ATTRS_MASK;
5309                 if (!fid_is_sane(fid)) {
5310                         if (OBD_FAIL_CHECK(OBD_FAIL_FID_LOOKUP) &&
5311                             likely(it->oie_dirent->oied_namelen != 2 ||
5312                                    it->oie_dirent->oied_name[0] != '.' ||
5313                                    it->oie_dirent->oied_name[1] != '.'))
5314                                 RETURN(-ENOENT);
5315
5316                         rc = osd_ea_fid_get(env, obj, ino, fid, id);
5317                 } else {
5318                         osd_id_gen(id, ino, OSD_OII_NOGEN);
5319                 }
5320         }
5321
5322         /* Pack the entry anyway, at least the offset is right. */
5323         osd_it_pack_dirent(lde, fid, it->oie_dirent->oied_off,
5324                            it->oie_dirent->oied_name,
5325                            it->oie_dirent->oied_namelen,
5326                            it->oie_dirent->oied_type, attr);
5327
5328         if (rc < 0)
5329                 RETURN(rc);
5330
5331         if (osd_remote_fid(env, dev, fid))
5332                 RETURN(0);
5333
5334         if (likely(!(attr & LUDA_IGNORE)))
5335                 rc = osd_add_oi_cache(oti, dev, id, fid);
5336
5337         if (!(attr & LUDA_VERIFY) &&
5338             (scrub->os_pos_current <= ino) &&
5339             ((sf->sf_flags & SF_INCONSISTENT) ||
5340              (sf->sf_flags & SF_UPGRADE && fid_is_igif(fid)) ||
5341              ldiskfs_test_bit(osd_oi_fid2idx(dev, fid), sf->sf_oi_bitmap)))
5342                 osd_consistency_check(oti, dev, oic);
5343
5344         RETURN(rc);
5345 }
5346
5347 /**
5348  * Returns a cookie for current position of the iterator head, so that
5349  * user can use this cookie to load/start the iterator next time.
5350  *
5351  * \param di iterator's in memory structure
5352  *
5353  * \retval cookie for current position, on success
5354  */
5355 static __u64 osd_it_ea_store(const struct lu_env *env, const struct dt_it *di)
5356 {
5357         struct osd_it_ea *it = (struct osd_it_ea *)di;
5358
5359         return it->oie_dirent->oied_off;
5360 }
5361
5362 /**
5363  * It calls osd_ldiskfs_it_fill() which will use ->readdir()
5364  * to load a directory entry at a time and stored it i inn,
5365  * in iterator's in-memory data structure.
5366  *
5367  * \param di struct osd_it_ea, iterator's in memory structure
5368  *
5369  * \retval +ve on success
5370  * \retval -ve on error
5371  */
5372 static int osd_it_ea_load(const struct lu_env *env,
5373                           const struct dt_it *di, __u64 hash)
5374 {
5375         struct osd_it_ea *it = (struct osd_it_ea *)di;
5376         int rc;
5377
5378         ENTRY;
5379         it->oie_file.f_pos = hash;
5380
5381         rc =  osd_ldiskfs_it_fill(env, di);
5382         if (rc == 0)
5383                 rc = +1;
5384
5385         RETURN(rc);
5386 }
5387
5388 /**
5389  * Index lookup function for interoperability mode (b11826).
5390  *
5391  * \param key,  key i.e. file name to be searched
5392  *
5393  * \retval +ve, on success
5394  * \retval -ve, on error
5395  */
5396 static int osd_index_ea_lookup(const struct lu_env *env, struct dt_object *dt,
5397                                struct dt_rec *rec, const struct dt_key *key,
5398                                struct lustre_capa *capa)
5399 {
5400         struct osd_object *obj = osd_dt_obj(dt);
5401         int rc = 0;
5402
5403         ENTRY;
5404
5405         LASSERT(S_ISDIR(obj->oo_inode->i_mode));
5406         LINVRNT(osd_invariant(obj));
5407
5408         if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
5409                 return -EACCES;
5410
5411         rc = osd_ea_lookup_rec(env, obj, rec, key);
5412         if (rc == 0)
5413                 rc = +1;
5414         RETURN(rc);
5415 }
5416
5417 /**
5418  * Index and Iterator operations for interoperability
5419  * mode (i.e. to run 2.0 mds on 1.8 disk) (b11826)
5420  */
5421 static const struct dt_index_operations osd_index_ea_ops = {
5422         .dio_lookup         = osd_index_ea_lookup,
5423         .dio_declare_insert = osd_index_declare_ea_insert,
5424         .dio_insert         = osd_index_ea_insert,
5425         .dio_declare_delete = osd_index_declare_ea_delete,
5426         .dio_delete         = osd_index_ea_delete,
5427         .dio_it     = {
5428                 .init     = osd_it_ea_init,
5429                 .fini     = osd_it_ea_fini,
5430                 .get      = osd_it_ea_get,
5431                 .put      = osd_it_ea_put,
5432                 .next     = osd_it_ea_next,
5433                 .key      = osd_it_ea_key,
5434                 .key_size = osd_it_ea_key_size,
5435                 .rec      = osd_it_ea_rec,
5436                 .store    = osd_it_ea_store,
5437                 .load     = osd_it_ea_load
5438         }
5439 };
5440
5441 static void *osd_key_init(const struct lu_context *ctx,
5442                           struct lu_context_key *key)
5443 {
5444         struct osd_thread_info *info;
5445
5446         OBD_ALLOC_PTR(info);
5447         if (info == NULL)
5448                 return ERR_PTR(-ENOMEM);
5449
5450         OBD_ALLOC(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
5451         if (info->oti_it_ea_buf == NULL)
5452                 goto out_free_info;
5453
5454         info->oti_env = container_of(ctx, struct lu_env, le_ctx);
5455
5456         info->oti_hlock = ldiskfs_htree_lock_alloc();
5457         if (info->oti_hlock == NULL)
5458                 goto out_free_ea;
5459
5460         return info;
5461
5462  out_free_ea:
5463         OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
5464  out_free_info:
5465         OBD_FREE_PTR(info);
5466         return ERR_PTR(-ENOMEM);
5467 }
5468
5469 static void osd_key_fini(const struct lu_context *ctx,
5470                          struct lu_context_key *key, void* data)
5471 {
5472         struct osd_thread_info *info = data;
5473
5474         if (info->oti_hlock != NULL)
5475                 ldiskfs_htree_lock_free(info->oti_hlock);
5476         OBD_FREE(info->oti_it_ea_buf, OSD_IT_EA_BUFSIZE);
5477         lu_buf_free(&info->oti_iobuf.dr_pg_buf);
5478         lu_buf_free(&info->oti_iobuf.dr_bl_buf);
5479         OBD_FREE_PTR(info);
5480 }
5481
5482 static void osd_key_exit(const struct lu_context *ctx,
5483                          struct lu_context_key *key, void *data)
5484 {
5485         struct osd_thread_info *info = data;
5486
5487         LASSERT(info->oti_r_locks == 0);
5488         LASSERT(info->oti_w_locks == 0);
5489         LASSERT(info->oti_txns    == 0);
5490 }
5491
5492 /* type constructor/destructor: osd_type_init, osd_type_fini */
5493 LU_TYPE_INIT_FINI(osd, &osd_key);
5494
5495 struct lu_context_key osd_key = {
5496         .lct_tags = LCT_DT_THREAD | LCT_MD_THREAD | LCT_MG_THREAD | LCT_LOCAL,
5497         .lct_init = osd_key_init,
5498         .lct_fini = osd_key_fini,
5499         .lct_exit = osd_key_exit
5500 };
5501
5502
5503 static int osd_device_init(const struct lu_env *env, struct lu_device *d,
5504                            const char *name, struct lu_device *next)
5505 {
5506         struct osd_device *osd = osd_dev(d);
5507
5508         if (strlcpy(osd->od_svname, name, sizeof(osd->od_svname))
5509             >= sizeof(osd->od_svname))
5510                 return -E2BIG;
5511         return osd_procfs_init(osd, name);
5512 }
5513
5514 static int osd_shutdown(const struct lu_env *env, struct osd_device *o)
5515 {
5516         ENTRY;
5517
5518         /* shutdown quota slave instance associated with the device */
5519         if (o->od_quota_slave != NULL) {
5520                 qsd_fini(env, o->od_quota_slave);
5521                 o->od_quota_slave = NULL;
5522         }
5523
5524         RETURN(0);
5525 }
5526
5527 static void osd_umount(const struct lu_env *env, struct osd_device *o)
5528 {
5529         ENTRY;
5530
5531         if (o->od_fsops) {
5532                 fsfilt_put_ops(o->od_fsops);
5533                 o->od_fsops = NULL;
5534         }
5535
5536         if (o->od_mnt != NULL) {
5537                 shrink_dcache_sb(osd_sb(o));
5538                 osd_sync(env, &o->od_dt_dev);
5539
5540                 mntput(o->od_mnt);
5541                 o->od_mnt = NULL;
5542         }
5543
5544         EXIT;
5545 }
5546
5547 static int osd_mount(const struct lu_env *env,
5548                      struct osd_device *o, struct lustre_cfg *cfg)
5549 {
5550         const char              *name  = lustre_cfg_string(cfg, 0);
5551         const char              *dev  = lustre_cfg_string(cfg, 1);
5552         const char              *opts;
5553         unsigned long            page, s_flags, lmd_flags = 0;
5554         struct page             *__page;
5555         struct file_system_type *type;
5556         char                    *options = NULL;
5557         char                    *str;
5558         struct osd_thread_info  *info = osd_oti_get(env);
5559         struct lu_fid           *fid = &info->oti_fid;
5560         struct inode            *inode;
5561         int                      rc = 0;
5562         ENTRY;
5563
5564         if (o->od_mnt != NULL)
5565                 RETURN(0);
5566
5567         if (strlen(dev) >= sizeof(o->od_mntdev))
5568                 RETURN(-E2BIG);
5569         strcpy(o->od_mntdev, dev);
5570
5571         o->od_fsops = fsfilt_get_ops(mt_str(LDD_MT_LDISKFS));
5572         if (IS_ERR(o->od_fsops)) {
5573                 CERROR("%s: Can't find fsfilt_ldiskfs\n", name);
5574                 o->od_fsops = NULL;
5575                 RETURN(-ENOTSUPP);
5576         }
5577
5578         OBD_PAGE_ALLOC(__page, GFP_IOFS);
5579         if (__page == NULL)
5580                 GOTO(out, rc = -ENOMEM);
5581
5582         str = lustre_cfg_string(cfg, 2);
5583         s_flags = simple_strtoul(str, NULL, 0);
5584         str = strstr(str, ":");
5585         if (str)
5586                 lmd_flags = simple_strtoul(str + 1, NULL, 0);
5587         opts = lustre_cfg_string(cfg, 3);
5588         page = (unsigned long)page_address(__page);
5589         options = (char *)page;
5590         *options = '\0';
5591         if (opts == NULL)
5592                 strcat(options, "user_xattr,acl");
5593         else
5594                 strcat(options, opts);
5595
5596         /* Glom up mount options */
5597         if (*options != '\0')
5598                 strcat(options, ",");
5599         strlcat(options, "no_mbcache", PAGE_CACHE_SIZE);
5600
5601         type = get_fs_type("ldiskfs");
5602         if (!type) {
5603                 CERROR("%s: cannot find ldiskfs module\n", name);
5604                 GOTO(out, rc = -ENODEV);
5605         }
5606
5607         o->od_mnt = vfs_kern_mount(type, s_flags, dev, options);
5608         module_put(type->owner);
5609
5610         if (IS_ERR(o->od_mnt)) {
5611                 rc = PTR_ERR(o->od_mnt);
5612                 o->od_mnt = NULL;
5613                 CERROR("%s: can't mount %s: %d\n", name, dev, rc);
5614                 GOTO(out, rc);
5615         }
5616
5617 #ifdef HAVE_DEV_SET_RDONLY
5618         if (dev_check_rdonly(o->od_mnt->mnt_sb->s_bdev)) {
5619                 CERROR("%s: underlying device %s is marked as read-only. "
5620                        "Setup failed\n", name, dev);
5621                 GOTO(out_mnt, rc = -EROFS);
5622         }
5623 #endif
5624
5625         if (!LDISKFS_HAS_COMPAT_FEATURE(o->od_mnt->mnt_sb,
5626                                         LDISKFS_FEATURE_COMPAT_HAS_JOURNAL)) {
5627                 CERROR("%s: device %s is mounted w/o journal\n", name, dev);
5628                 GOTO(out_mnt, rc = -EINVAL);
5629         }
5630
5631 #ifdef LDISKFS_MOUNT_DIRDATA
5632         if (LDISKFS_HAS_INCOMPAT_FEATURE(o->od_mnt->mnt_sb,
5633                                          LDISKFS_FEATURE_INCOMPAT_DIRDATA))
5634                 LDISKFS_SB(osd_sb(o))->s_mount_opt |= LDISKFS_MOUNT_DIRDATA;
5635 #endif
5636         inode = osd_sb(o)->s_root->d_inode;
5637         ldiskfs_set_inode_state(inode, LDISKFS_STATE_LUSTRE_NO_OI);
5638         lu_local_obj_fid(fid, OSD_FS_ROOT_OID);
5639         rc = osd_ea_fid_set(info, inode, fid, LMAC_NOT_IN_OI, 0);
5640         if (rc != 0) {
5641                 CERROR("%s: failed to set lma on %s root inode\n", name, dev);
5642                 GOTO(out_mnt, rc);
5643         }
5644
5645         if (lmd_flags & LMD_FLG_NOSCRUB)
5646                 o->od_noscrub = 1;
5647
5648         GOTO(out, rc = 0);
5649
5650 out_mnt:
5651         mntput(o->od_mnt);
5652         o->od_mnt = NULL;
5653
5654 out:
5655         if (__page)
5656                 OBD_PAGE_FREE(__page);
5657         if (rc)
5658                 fsfilt_put_ops(o->od_fsops);
5659
5660         return rc;
5661 }
5662
5663 static struct lu_device *osd_device_fini(const struct lu_env *env,
5664                                          struct lu_device *d)
5665 {
5666         struct osd_device *o = osd_dev(d);
5667         ENTRY;
5668
5669         osd_shutdown(env, o);
5670         osd_procfs_fini(o);
5671         osd_scrub_cleanup(env, o);
5672         osd_obj_map_fini(o);
5673         osd_umount(env, o);
5674
5675         RETURN(NULL);
5676 }
5677
5678 static int osd_device_init0(const struct lu_env *env,
5679                             struct osd_device *o,
5680                             struct lustre_cfg *cfg)
5681 {
5682         struct lu_device        *l = osd2lu_dev(o);
5683         struct osd_thread_info *info;
5684         int                     rc;
5685         int                     cplen = 0;
5686
5687         /* if the module was re-loaded, env can loose its keys */
5688         rc = lu_env_refill((struct lu_env *) env);
5689         if (rc)
5690                 GOTO(out, rc);
5691         info = osd_oti_get(env);
5692         LASSERT(info);
5693
5694         l->ld_ops = &osd_lu_ops;
5695         o->od_dt_dev.dd_ops = &osd_dt_ops;
5696
5697         spin_lock_init(&o->od_osfs_lock);
5698         mutex_init(&o->od_otable_mutex);
5699         o->od_osfs_age = cfs_time_shift_64(-1000);
5700
5701         o->od_capa_hash = init_capa_hash();
5702         if (o->od_capa_hash == NULL)
5703                 GOTO(out, rc = -ENOMEM);
5704
5705         o->od_read_cache = 1;
5706         o->od_writethrough_cache = 1;
5707         o->od_readcache_max_filesize = OSD_MAX_CACHE_SIZE;
5708
5709         rc = osd_mount(env, o, cfg);
5710         if (rc)
5711                 GOTO(out_capa, rc);
5712
5713         cplen = strlcpy(o->od_svname, lustre_cfg_string(cfg, 4),
5714                         sizeof(o->od_svname));
5715         if (cplen >= sizeof(o->od_svname)) {
5716                 rc = -E2BIG;
5717                 GOTO(out_mnt, rc);
5718         }
5719
5720         if (server_name_is_ost(o->od_svname))
5721                 o->od_is_ost = 1;
5722
5723         rc = osd_obj_map_init(env, o);
5724         if (rc != 0)
5725                 GOTO(out_mnt, rc);
5726
5727         rc = lu_site_init(&o->od_site, l);
5728         if (rc != 0)
5729                 GOTO(out_compat, rc);
5730         o->od_site.ls_bottom_dev = l;
5731
5732         rc = lu_site_init_finish(&o->od_site);
5733         if (rc != 0)
5734                 GOTO(out_site, rc);
5735
5736         CFS_INIT_LIST_HEAD(&o->od_ios_list);
5737         /* setup scrub, including OI files initialization */
5738         rc = osd_scrub_setup(env, o);
5739         if (rc < 0)
5740                 GOTO(out_site, rc);
5741
5742         rc = osd_procfs_init(o, o->od_svname);
5743         if (rc != 0) {
5744                 CERROR("%s: can't initialize procfs: rc = %d\n",
5745                        o->od_svname, rc);
5746                 GOTO(out_scrub, rc);
5747         }
5748
5749         LASSERT(l->ld_site->ls_linkage.next && l->ld_site->ls_linkage.prev);
5750
5751         /* initialize quota slave instance */
5752         o->od_quota_slave = qsd_init(env, o->od_svname, &o->od_dt_dev,
5753                                      o->od_proc_entry);
5754         if (IS_ERR(o->od_quota_slave)) {
5755                 rc = PTR_ERR(o->od_quota_slave);
5756                 o->od_quota_slave = NULL;
5757                 GOTO(out_procfs, rc);
5758         }
5759
5760         RETURN(0);
5761
5762 out_procfs:
5763         osd_procfs_fini(o);
5764 out_scrub:
5765         osd_scrub_cleanup(env, o);
5766 out_site:
5767         lu_site_fini(&o->od_site);
5768 out_compat:
5769         osd_obj_map_fini(o);
5770 out_mnt:
5771         osd_umount(env, o);
5772 out_capa:
5773         cleanup_capa_hash(o->od_capa_hash);
5774 out:
5775         return rc;
5776 }
5777
5778 static struct lu_device *osd_device_alloc(const struct lu_env *env,
5779                                           struct lu_device_type *t,
5780                                           struct lustre_cfg *cfg)
5781 {
5782         struct osd_device *o;
5783         int                rc;
5784
5785         OBD_ALLOC_PTR(o);
5786         if (o == NULL)
5787                 return ERR_PTR(-ENOMEM);
5788
5789         rc = dt_device_init(&o->od_dt_dev, t);
5790         if (rc == 0) {
5791                 /* Because the ctx might be revived in dt_device_init,
5792                  * refill the env here */
5793                 lu_env_refill((struct lu_env *)env);
5794                 rc = osd_device_init0(env, o, cfg);
5795                 if (rc)
5796                         dt_device_fini(&o->od_dt_dev);
5797         }
5798
5799         if (unlikely(rc != 0))
5800                 OBD_FREE_PTR(o);
5801
5802         return rc == 0 ? osd2lu_dev(o) : ERR_PTR(rc);
5803 }
5804
5805 static struct lu_device *osd_device_free(const struct lu_env *env,
5806                                          struct lu_device *d)
5807 {
5808         struct osd_device *o = osd_dev(d);
5809         ENTRY;
5810
5811         cleanup_capa_hash(o->od_capa_hash);
5812         /* XXX: make osd top device in order to release reference */
5813         d->ld_site->ls_top_dev = d;
5814         lu_site_purge(env, d->ld_site, -1);
5815         if (!cfs_hash_is_empty(d->ld_site->ls_obj_hash)) {
5816                 LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
5817                 lu_site_print(env, d->ld_site, &msgdata, lu_cdebug_printer);
5818         }
5819         lu_site_fini(&o->od_site);
5820         dt_device_fini(&o->od_dt_dev);
5821         OBD_FREE_PTR(o);
5822         RETURN(NULL);
5823 }
5824
5825 static int osd_process_config(const struct lu_env *env,
5826                               struct lu_device *d, struct lustre_cfg *cfg)
5827 {
5828         struct osd_device               *o = osd_dev(d);
5829         int                             rc;
5830         ENTRY;
5831
5832         switch (cfg->lcfg_command) {
5833         case LCFG_SETUP:
5834                 rc = osd_mount(env, o, cfg);
5835                 break;
5836         case LCFG_CLEANUP:
5837                 lu_dev_del_linkage(d->ld_site, d);
5838                 rc = osd_shutdown(env, o);
5839                 break;
5840         case LCFG_PARAM:
5841                 LASSERT(&o->od_dt_dev);
5842                 rc = class_process_proc_param(PARAM_OSD, lprocfs_osd_obd_vars,
5843                                               cfg, &o->od_dt_dev);
5844                 if (rc > 0 || rc == -ENOSYS)
5845                         rc = class_process_proc_param(PARAM_OST,
5846                                                       lprocfs_osd_obd_vars,
5847                                                       cfg, &o->od_dt_dev);
5848                 break;
5849         default:
5850                 rc = -ENOSYS;
5851         }
5852
5853         RETURN(rc);
5854 }
5855
5856 static int osd_recovery_complete(const struct lu_env *env,
5857                                  struct lu_device *d)
5858 {
5859         struct osd_device       *osd = osd_dev(d);
5860         int                      rc = 0;
5861         ENTRY;
5862
5863         if (osd->od_quota_slave == NULL)
5864                 RETURN(0);
5865
5866         /* start qsd instance on recovery completion, this notifies the quota
5867          * slave code that we are about to process new requests now */
5868         rc = qsd_start(env, osd->od_quota_slave);
5869         RETURN(rc);
5870 }
5871
5872 /*
5873  * we use exports to track all osd users
5874  */
5875 static int osd_obd_connect(const struct lu_env *env, struct obd_export **exp,
5876                            struct obd_device *obd, struct obd_uuid *cluuid,
5877                            struct obd_connect_data *data, void *localdata)
5878 {
5879         struct osd_device    *osd = osd_dev(obd->obd_lu_dev);
5880         struct lustre_handle  conn;
5881         int                   rc;
5882         ENTRY;
5883
5884         CDEBUG(D_CONFIG, "connect #%d\n", osd->od_connects);
5885
5886         rc = class_connect(&conn, obd, cluuid);
5887         if (rc)
5888                 RETURN(rc);
5889
5890         *exp = class_conn2export(&conn);
5891
5892         spin_lock(&osd->od_osfs_lock);
5893         osd->od_connects++;
5894         spin_unlock(&osd->od_osfs_lock);
5895
5896         RETURN(0);
5897 }
5898
5899 /*
5900  * once last export (we don't count self-export) disappeared
5901  * osd can be released
5902  */
5903 static int osd_obd_disconnect(struct obd_export *exp)
5904 {
5905         struct obd_device *obd = exp->exp_obd;
5906         struct osd_device *osd = osd_dev(obd->obd_lu_dev);
5907         int                rc, release = 0;
5908         ENTRY;
5909
5910         /* Only disconnect the underlying layers on the final disconnect. */
5911         spin_lock(&osd->od_osfs_lock);
5912         osd->od_connects--;
5913         if (osd->od_connects == 0)
5914                 release = 1;
5915         spin_unlock(&osd->od_osfs_lock);
5916
5917         rc = class_disconnect(exp); /* bz 9811 */
5918
5919         if (rc == 0 && release)
5920                 class_manual_cleanup(obd);
5921         RETURN(rc);
5922 }
5923
5924 static int osd_prepare(const struct lu_env *env, struct lu_device *pdev,
5925                        struct lu_device *dev)
5926 {
5927         struct osd_device *osd = osd_dev(dev);
5928         int                result = 0;
5929         ENTRY;
5930
5931         if (osd->od_quota_slave != NULL)
5932                 /* set up quota slave objects */
5933                 result = qsd_prepare(env, osd->od_quota_slave);
5934
5935         RETURN(result);
5936 }
5937
5938 static const struct lu_object_operations osd_lu_obj_ops = {
5939         .loo_object_init      = osd_object_init,
5940         .loo_object_delete    = osd_object_delete,
5941         .loo_object_release   = osd_object_release,
5942         .loo_object_free      = osd_object_free,
5943         .loo_object_print     = osd_object_print,
5944         .loo_object_invariant = osd_object_invariant
5945 };
5946
5947 const struct lu_device_operations osd_lu_ops = {
5948         .ldo_object_alloc      = osd_object_alloc,
5949         .ldo_process_config    = osd_process_config,
5950         .ldo_recovery_complete = osd_recovery_complete,
5951         .ldo_prepare           = osd_prepare,
5952 };
5953
5954 static const struct lu_device_type_operations osd_device_type_ops = {
5955         .ldto_init = osd_type_init,
5956         .ldto_fini = osd_type_fini,
5957
5958         .ldto_start = osd_type_start,
5959         .ldto_stop  = osd_type_stop,
5960
5961         .ldto_device_alloc = osd_device_alloc,
5962         .ldto_device_free  = osd_device_free,
5963
5964         .ldto_device_init    = osd_device_init,
5965         .ldto_device_fini    = osd_device_fini
5966 };
5967
5968 struct lu_device_type osd_device_type = {
5969         .ldt_tags     = LU_DEVICE_DT,
5970         .ldt_name     = LUSTRE_OSD_LDISKFS_NAME,
5971         .ldt_ops      = &osd_device_type_ops,
5972         .ldt_ctx_tags = LCT_LOCAL,
5973 };
5974
5975 /*
5976  * lprocfs legacy support.
5977  */
5978 static struct obd_ops osd_obd_device_ops = {
5979         .o_owner = THIS_MODULE,
5980         .o_connect      = osd_obd_connect,
5981         .o_disconnect   = osd_obd_disconnect
5982 };
5983
5984 static int __init osd_mod_init(void)
5985 {
5986         int rc;
5987
5988         osd_oi_mod_init();
5989
5990         rc = lu_kmem_init(ldiskfs_caches);
5991         if (rc)
5992                 return rc;
5993
5994         rc = class_register_type(&osd_obd_device_ops, NULL,
5995                                  lprocfs_osd_module_vars,
5996                                  LUSTRE_OSD_LDISKFS_NAME, &osd_device_type);
5997         if (rc)
5998                 lu_kmem_fini(ldiskfs_caches);
5999         return rc;
6000 }
6001
6002 static void __exit osd_mod_exit(void)
6003 {
6004         class_unregister_type(LUSTRE_OSD_LDISKFS_NAME);
6005         lu_kmem_fini(ldiskfs_caches);
6006 }
6007
6008 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
6009 MODULE_DESCRIPTION("Lustre Object Storage Device ("LUSTRE_OSD_LDISKFS_NAME")");
6010 MODULE_LICENSE("GPL");
6011
6012 cfs_module(osd, "0.1.0", osd_mod_init, osd_mod_exit);