Whamcloud - gitweb
LU-11266 build: update changelog for Ubuntu
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_handler.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2011, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/osd/osd_handler.c
33  *
34  * Top-level entry points into osd module
35  *
36  * Author: Nikita Danilov <nikita@clusterfs.com>
37  *         Pravin Shelar <pravin.shelar@sun.com> : Added fid in dirent
38  */
39
40 #define DEBUG_SUBSYSTEM S_OSD
41
42 #include <linux/kallsyms.h>
43 #include <linux/module.h>
44 #include <linux/user_namespace.h>
45 #ifdef HAVE_UIDGID_HEADER
46 # include <linux/uidgid.h>
47 #endif
48
49 /* prerequisite for linux/xattr.h */
50 #include <linux/types.h>
51 /* prerequisite for linux/xattr.h */
52 #include <linux/fs.h>
53 /* XATTR_{REPLACE,CREATE} */
54 #include <linux/xattr.h>
55
56 #include <ldiskfs/ldiskfs.h>
57 #include <ldiskfs/xattr.h>
58 #include <ldiskfs/ldiskfs_extents.h>
59 #undef ENTRY
60 /*
61  * struct OBD_{ALLOC,FREE}*()
62  * OBD_FAIL_CHECK
63  */
64 #include <obd_support.h>
65 /* struct ptlrpc_thread */
66 #include <lustre_net.h>
67 #include <lustre_fid.h>
68 /* process_config */
69 #include <uapi/linux/lustre/lustre_param.h>
70
71 #include "osd_internal.h"
72 #include "osd_dynlocks.h"
73
74 /* llo_* api support */
75 #include <md_object.h>
76 #include <lustre_quota.h>
77
78 #include <lustre_linkea.h>
79
80 /* Maximum EA size is limited by LNET_MTU for remote objects */
81 #define OSD_MAX_EA_SIZE 1048364
82
83 int ldiskfs_pdo = 1;
84 module_param(ldiskfs_pdo, int, 0644);
85 MODULE_PARM_DESC(ldiskfs_pdo, "ldiskfs with parallel directory operations");
86
87 int ldiskfs_track_declares_assert;
88 module_param(ldiskfs_track_declares_assert, int, 0644);
89 MODULE_PARM_DESC(ldiskfs_track_declares_assert, "LBUG during tracking of declares");
90
91 /* Slab to allocate dynlocks */
92 struct kmem_cache *dynlock_cachep;
93
94 /* Slab to allocate osd_it_ea */
95 struct kmem_cache *osd_itea_cachep;
96
97 static struct lu_kmem_descr ldiskfs_caches[] = {
98         {
99                 .ckd_cache = &dynlock_cachep,
100                 .ckd_name  = "dynlock_cache",
101                 .ckd_size  = sizeof(struct dynlock_handle)
102         },
103         {
104                 .ckd_cache = &osd_itea_cachep,
105                 .ckd_name  = "osd_itea_cache",
106                 .ckd_size  = sizeof(struct osd_it_ea)
107         },
108         {
109                 .ckd_cache = NULL
110         }
111 };
112
113 static const char dot[] = ".";
114 static const char dotdot[] = "..";
115
116 static const struct lu_object_operations      osd_lu_obj_ops;
117 static const struct dt_object_operations      osd_obj_ops;
118 static const struct dt_object_operations      osd_obj_otable_it_ops;
119 static const struct dt_index_operations       osd_index_iam_ops;
120 static const struct dt_index_operations       osd_index_ea_ops;
121
122 static int osd_remote_fid(const struct lu_env *env, struct osd_device *osd,
123                           const struct lu_fid *fid);
124 static int osd_process_scheduled_agent_removals(const struct lu_env *env,
125                                                 struct osd_device *osd);
126
127 int osd_trans_declare_op2rb[] = {
128         [OSD_OT_ATTR_SET]       = OSD_OT_ATTR_SET,
129         [OSD_OT_PUNCH]          = OSD_OT_MAX,
130         [OSD_OT_XATTR_SET]      = OSD_OT_XATTR_SET,
131         [OSD_OT_CREATE]         = OSD_OT_DESTROY,
132         [OSD_OT_DESTROY]        = OSD_OT_CREATE,
133         [OSD_OT_REF_ADD]        = OSD_OT_REF_DEL,
134         [OSD_OT_REF_DEL]        = OSD_OT_REF_ADD,
135         [OSD_OT_WRITE]          = OSD_OT_WRITE,
136         [OSD_OT_INSERT]         = OSD_OT_DELETE,
137         [OSD_OT_DELETE]         = OSD_OT_INSERT,
138         [OSD_OT_QUOTA]          = OSD_OT_MAX,
139 };
140
141 static int osd_has_index(const struct osd_object *obj)
142 {
143         return obj->oo_dt.do_index_ops != NULL;
144 }
145
146 static int osd_object_invariant(const struct lu_object *l)
147 {
148         return osd_invariant(osd_obj(l));
149 }
150
151 /*
152  * Concurrency: doesn't matter
153  */
154 static int osd_is_write_locked(const struct lu_env *env, struct osd_object *o)
155 {
156         struct osd_thread_info *oti = osd_oti_get(env);
157         return oti->oti_w_locks > 0 && o->oo_owner == env;
158 }
159
160 /*
161  * Concurrency: doesn't access mutable data
162  */
163 static int osd_root_get(const struct lu_env *env,
164                         struct dt_device *dev, struct lu_fid *f)
165 {
166         lu_local_obj_fid(f, OSD_FS_ROOT_OID);
167         return 0;
168 }
169
170 /*
171  * the following set of functions are used to maintain per-thread
172  * cache of FID->ino mapping. this mechanism is needed to resolve
173  * FID to inode at dt_insert() which in turn stores ino in the
174  * directory entries to keep ldiskfs compatible with ext[34].
175  * due to locking-originated restrictions we can't lookup ino
176  * using LU cache (deadlock is possible). lookup using OI is quite
177  * expensive. so instead we maintain this cache and methods like
178  * dt_create() fill it. so in the majority of cases dt_insert() is
179  * able to find needed mapping in lockless manner.
180  */
181 static struct osd_idmap_cache *
182 osd_idc_find(const struct lu_env *env, struct osd_device *osd,
183              const struct lu_fid *fid)
184 {
185         struct osd_thread_info  *oti   = osd_oti_get(env);
186         struct osd_idmap_cache  *idc    = oti->oti_ins_cache;
187         int i;
188         for (i = 0; i < oti->oti_ins_cache_used; i++) {
189                 if (!lu_fid_eq(&idc[i].oic_fid, fid))
190                         continue;
191                 if (idc[i].oic_dev != osd)
192                         continue;
193
194                 return idc + i;
195         }
196
197         return NULL;
198 }
199
200 static struct osd_idmap_cache *
201 osd_idc_add(const struct lu_env *env, struct osd_device *osd,
202             const struct lu_fid *fid)
203 {
204         struct osd_thread_info  *oti   = osd_oti_get(env);
205         struct osd_idmap_cache  *idc;
206         int i;
207
208         if (unlikely(oti->oti_ins_cache_used >= oti->oti_ins_cache_size)) {
209                 i = oti->oti_ins_cache_size * 2;
210                 if (i == 0)
211                         i = OSD_INS_CACHE_SIZE;
212                 OBD_ALLOC(idc, sizeof(*idc) * i);
213                 if (idc == NULL)
214                         return ERR_PTR(-ENOMEM);
215                 if (oti->oti_ins_cache != NULL) {
216                         memcpy(idc, oti->oti_ins_cache,
217                                oti->oti_ins_cache_used * sizeof(*idc));
218                         OBD_FREE(oti->oti_ins_cache,
219                                  oti->oti_ins_cache_used * sizeof(*idc));
220                 }
221                 oti->oti_ins_cache = idc;
222                 oti->oti_ins_cache_size = i;
223         }
224
225         idc = oti->oti_ins_cache + oti->oti_ins_cache_used++;
226         idc->oic_fid = *fid;
227         idc->oic_dev = osd;
228         idc->oic_lid.oii_ino = 0;
229         idc->oic_lid.oii_gen = 0;
230         idc->oic_remote = 0;
231
232         return idc;
233 }
234
235 /*
236  * lookup mapping for the given fid in the cache, initialize a
237  * new one if not found. the initialization checks whether the
238  * object is local or remote. for local objects, OI is used to
239  * learn ino/generation. the function is used when the caller
240  * has no information about the object, e.g. at dt_insert().
241  */
242 static struct osd_idmap_cache *
243 osd_idc_find_or_init(const struct lu_env *env, struct osd_device *osd,
244                      const struct lu_fid *fid)
245 {
246         struct osd_idmap_cache *idc;
247         int rc;
248
249         idc = osd_idc_find(env, osd, fid);
250         LASSERT(!IS_ERR(idc));
251         if (idc != NULL)
252                 return idc;
253
254         CDEBUG(D_INODE, "%s: FID "DFID" not in the id map cache\n",
255                osd->od_svname, PFID(fid));
256
257         /* new mapping is needed */
258         idc = osd_idc_add(env, osd, fid);
259         if (IS_ERR(idc)) {
260                 CERROR("%s: FID "DFID" add id map cache failed: %ld\n",
261                        osd->od_svname, PFID(fid), PTR_ERR(idc));
262                 return idc;
263         }
264
265         /* initialize it */
266         rc = osd_remote_fid(env, osd, fid);
267         if (unlikely(rc < 0))
268                 return ERR_PTR(rc);
269
270         if (rc == 0) {
271                 /* the object is local, lookup in OI */
272                 /* XXX: probably cheaper to lookup in LU first? */
273                 rc = osd_oi_lookup(osd_oti_get(env), osd, fid,
274                                    &idc->oic_lid, 0);
275                 if (unlikely(rc < 0)) {
276                         CERROR("can't lookup: rc = %d\n", rc);
277                         return ERR_PTR(rc);
278                 }
279         } else {
280                 /* the object is remote */
281                 idc->oic_remote = 1;
282         }
283
284         return idc;
285 }
286
287 /*
288  * lookup mapping for given FID and fill it from the given object.
289  * the object is lolcal by definition.
290  */
291 static int osd_idc_find_and_init(const struct lu_env *env,
292                                  struct osd_device *osd,
293                                  struct osd_object *obj)
294 {
295         const struct lu_fid     *fid = lu_object_fid(&obj->oo_dt.do_lu);
296         struct osd_idmap_cache  *idc;
297
298         idc = osd_idc_find(env, osd, fid);
299         LASSERT(!IS_ERR(idc));
300         if (idc != NULL) {
301                 if (obj->oo_inode == NULL)
302                         return 0;
303                 if (idc->oic_lid.oii_ino != obj->oo_inode->i_ino) {
304                         LASSERT(idc->oic_lid.oii_ino == 0);
305                         idc->oic_lid.oii_ino = obj->oo_inode->i_ino;
306                         idc->oic_lid.oii_gen = obj->oo_inode->i_generation;
307                 }
308                 return 0;
309         }
310
311         CDEBUG(D_INODE, "%s: FID "DFID" not in the id map cache\n",
312                osd->od_svname, PFID(fid));
313
314         /* new mapping is needed */
315         idc = osd_idc_add(env, osd, fid);
316         if (IS_ERR(idc)) {
317                 CERROR("%s: FID "DFID" add id map cache failed: %ld\n",
318                        osd->od_svname, PFID(fid), PTR_ERR(idc));
319                 return PTR_ERR(idc);
320         }
321
322         if (obj->oo_inode != NULL) {
323                 idc->oic_lid.oii_ino = obj->oo_inode->i_ino;
324                 idc->oic_lid.oii_gen = obj->oo_inode->i_generation;
325         }
326         return 0;
327 }
328
329 /*
330  * OSD object methods.
331  */
332
333 /*
334  * Concurrency: no concurrent access is possible that early in object
335  * life-cycle.
336  */
337 static struct lu_object *osd_object_alloc(const struct lu_env *env,
338                                           const struct lu_object_header *hdr,
339                                           struct lu_device *d)
340 {
341         struct osd_object *mo;
342
343         OBD_ALLOC_PTR(mo);
344         if (mo != NULL) {
345                 struct lu_object *l;
346                 struct lu_object_header *h;
347                 struct osd_device *o = osd_dev(d);
348
349                 l = &mo->oo_dt.do_lu;
350                 if (unlikely(o->od_in_init)) {
351                         OBD_ALLOC_PTR(h);
352                         if (!h) {
353                                 OBD_FREE_PTR(mo);
354                                 return NULL;
355                         }
356
357                         lu_object_header_init(h);
358                         lu_object_init(l, h, d);
359                         lu_object_add_top(h, l);
360                         mo->oo_header = h;
361                 } else {
362                         dt_object_init(&mo->oo_dt, NULL, d);
363                         mo->oo_header = NULL;
364                 }
365
366                 mo->oo_dt.do_ops = &osd_obj_ops;
367                 l->lo_ops = &osd_lu_obj_ops;
368                 init_rwsem(&mo->oo_sem);
369                 init_rwsem(&mo->oo_ext_idx_sem);
370                 spin_lock_init(&mo->oo_guard);
371                 INIT_LIST_HEAD(&mo->oo_xattr_list);
372                 return l;
373         } else {
374                 return NULL;
375         }
376 }
377
378 int osd_get_lma(struct osd_thread_info *info, struct inode *inode,
379                 struct dentry *dentry, struct lustre_ost_attrs *loa)
380 {
381         int rc;
382
383         rc = __osd_xattr_get(inode, dentry, XATTR_NAME_LMA,
384                              (void *)loa, sizeof(*loa));
385         if (rc > 0) {
386                 struct lustre_mdt_attrs *lma = &loa->loa_lma;
387
388                 if (rc < sizeof(*lma))
389                         return -EINVAL;
390
391                 rc = 0;
392                 lustre_loa_swab(loa, true);
393                 /* Check LMA compatibility */
394                 if (lma->lma_incompat & ~LMA_INCOMPAT_SUPP) {
395                         CWARN("%s: unsupported incompat LMA feature(s) %#x "
396                               "for fid = "DFID", ino = %lu\n",
397                               osd_ino2name(inode),
398                               lma->lma_incompat & ~LMA_INCOMPAT_SUPP,
399                               PFID(&lma->lma_self_fid), inode->i_ino);
400                         rc = -EOPNOTSUPP;
401                 }
402         } else if (rc == 0) {
403                 rc = -ENODATA;
404         }
405
406         return rc;
407 }
408
409 /*
410  * retrieve object from backend ext fs.
411  **/
412 struct inode *osd_iget(struct osd_thread_info *info, struct osd_device *dev,
413                        struct osd_inode_id *id)
414 {
415         int rc;
416         struct inode *inode = NULL;
417
418         /* if we look for an inode withing a running
419          * transaction, then we risk to deadlock */
420         /* osd_dirent_check_repair() breaks this */
421         /*LASSERT(current->journal_info == NULL);*/
422
423         inode = ldiskfs_iget(osd_sb(dev), id->oii_ino);
424         if (IS_ERR(inode)) {
425                 CDEBUG(D_INODE, "no inode: ino = %u, rc = %ld\n",
426                        id->oii_ino, PTR_ERR(inode));
427         } else if (id->oii_gen != OSD_OII_NOGEN &&
428                    inode->i_generation != id->oii_gen) {
429                 CDEBUG(D_INODE, "unmatched inode: ino = %u, oii_gen = %u, "
430                        "i_generation = %u\n",
431                        id->oii_ino, id->oii_gen, inode->i_generation);
432                 iput(inode);
433                 inode = ERR_PTR(-ESTALE);
434         } else if (inode->i_nlink == 0) {
435                 /* due to parallel readdir and unlink,
436                 * we can have dead inode here. */
437                 CDEBUG(D_INODE, "stale inode: ino = %u\n", id->oii_ino);
438                 iput(inode);
439                 inode = ERR_PTR(-ESTALE);
440         } else if (is_bad_inode(inode)) {
441                 CWARN("%s: bad inode: ino = %u\n",
442                 osd_dev2name(dev), id->oii_ino);
443                 iput(inode);
444                 inode = ERR_PTR(-ENOENT);
445         } else if ((rc = osd_attach_jinode(inode))) {
446                 iput(inode);
447                 inode = ERR_PTR(rc);
448         } else {
449                 ldiskfs_clear_inode_state(inode, LDISKFS_STATE_LUSTRE_DESTROY);
450                 if (id->oii_gen == OSD_OII_NOGEN)
451                         osd_id_gen(id, inode->i_ino, inode->i_generation);
452
453                 /* Do not update file c/mtime in ldiskfs.
454                  * NB: we don't have any lock to protect this because we don't
455                  * have reference on osd_object now, but contention with
456                  * another lookup + attr_set can't happen in the tiny window
457                  * between if (...) and set S_NOCMTIME. */
458                 if (!(inode->i_flags & S_NOCMTIME))
459                         inode->i_flags |= S_NOCMTIME;
460         }
461         return inode;
462 }
463
464 int osd_ldiskfs_add_entry(struct osd_thread_info *info, struct osd_device *osd,
465                           handle_t *handle, struct dentry *child,
466                           struct inode *inode, struct htree_lock *hlock)
467 {
468         int rc, rc2;
469
470         rc = __ldiskfs_add_entry(handle, child, inode, hlock);
471         if (rc == -ENOBUFS || rc == -ENOSPC) {
472                 struct lustre_ost_attrs *loa = &info->oti_ost_attrs;
473                 struct inode *parent = child->d_parent->d_inode;
474                 struct lu_fid *fid = NULL;
475
476                 rc2 = osd_get_lma(info, parent, child->d_parent, loa);
477                 if (!rc2) {
478                         fid = &loa->loa_lma.lma_self_fid;
479                 } else if (rc2 == -ENODATA) {
480                         if (unlikely(parent == inode->i_sb->s_root->d_inode)) {
481                                 fid = &info->oti_fid3;
482                                 lu_local_obj_fid(fid, OSD_FS_ROOT_OID);
483                         } else if (!osd->od_is_ost && osd->od_index == 0) {
484                                 fid = &info->oti_fid3;
485                                 lu_igif_build(fid, parent->i_ino,
486                                               parent->i_generation);
487                         }
488                 }
489
490                 if (fid != NULL)
491                         CWARN("%s: directory (inode: %lu, FID: "DFID") %s "
492                               "maximum entry limit\n",
493                               osd_name(osd), parent->i_ino, PFID(fid),
494                               rc == -ENOSPC ? "has reached" : "is approaching");
495                 else
496                         CWARN("%s: directory (inode: %lu, FID: unknown) %s "
497                               "maximum entry limit\n",
498                               osd_name(osd), parent->i_ino,
499                               rc == -ENOSPC ? "has reached" : "is approaching");
500
501                 /* ignore such error now */
502                 if (rc == -ENOBUFS)
503                         rc = 0;
504         }
505
506         return rc;
507 }
508
509
510 struct inode *
511 osd_iget_fid(struct osd_thread_info *info, struct osd_device *dev,
512              struct osd_inode_id *id, struct lu_fid *fid)
513 {
514         struct lustre_ost_attrs *loa = &info->oti_ost_attrs;
515         struct inode *inode;
516         int rc;
517
518         inode = osd_iget(info, dev, id);
519         if (IS_ERR(inode))
520                 return inode;
521
522         rc = osd_get_lma(info, inode, &info->oti_obj_dentry, loa);
523         if (!rc) {
524                 *fid = loa->loa_lma.lma_self_fid;
525         } else if (rc == -ENODATA) {
526                 if (unlikely(inode == osd_sb(dev)->s_root->d_inode))
527                         lu_local_obj_fid(fid, OSD_FS_ROOT_OID);
528                 else
529                         lu_igif_build(fid, inode->i_ino, inode->i_generation);
530         } else {
531                 iput(inode);
532                 inode = ERR_PTR(rc);
533         }
534         return inode;
535 }
536
537 static struct inode *osd_iget_check(struct osd_thread_info *info,
538                                     struct osd_device *dev,
539                                     const struct lu_fid *fid,
540                                     struct osd_inode_id *id,
541                                     bool trusted)
542 {
543         struct inode *inode;
544         int rc = 0;
545         ENTRY;
546
547         /* The cached OI mapping is trustable. If we cannot locate the inode
548          * via the cached OI mapping, then return the failure to the caller
549          * directly without further OI checking. */
550
551 again:
552         inode = ldiskfs_iget(osd_sb(dev), id->oii_ino);
553         if (IS_ERR(inode)) {
554                 rc = PTR_ERR(inode);
555                 if (!trusted && (rc == -ENOENT || rc == -ESTALE))
556                         goto check_oi;
557
558                 CDEBUG(D_INODE, "no inode for FID: "DFID", ino = %u, rc = %d\n",
559                        PFID(fid), id->oii_ino, rc);
560                 GOTO(put, rc);
561         }
562
563         if (is_bad_inode(inode)) {
564                 rc = -ENOENT;
565                 if (!trusted)
566                         goto check_oi;
567
568                 CDEBUG(D_INODE, "bad inode for FID: "DFID", ino = %u\n",
569                        PFID(fid), id->oii_ino);
570                 GOTO(put, rc);
571         }
572
573         if (id->oii_gen != OSD_OII_NOGEN &&
574             inode->i_generation != id->oii_gen) {
575                 rc = -ESTALE;
576                 if (!trusted)
577                         goto check_oi;
578
579                 CDEBUG(D_INODE, "unmatched inode for FID: "DFID", ino = %u, "
580                        "oii_gen = %u, i_generation = %u\n", PFID(fid),
581                        id->oii_ino, id->oii_gen, inode->i_generation);
582                 GOTO(put, rc);
583         }
584
585         if (inode->i_nlink == 0) {
586                 rc = -ENOENT;
587                 if (!trusted)
588                         goto check_oi;
589
590                 CDEBUG(D_INODE, "stale inode for FID: "DFID", ino = %u\n",
591                        PFID(fid), id->oii_ino);
592                 GOTO(put, rc);
593         }
594
595         ldiskfs_clear_inode_state(inode, LDISKFS_STATE_LUSTRE_DESTROY);
596
597 check_oi:
598         if (rc != 0) {
599                 __u32 saved_ino = id->oii_ino;
600                 __u32 saved_gen = id->oii_gen;
601
602                 LASSERT(!trusted);
603                 LASSERTF(rc == -ESTALE || rc == -ENOENT, "rc = %d\n", rc);
604
605                 rc = osd_oi_lookup(info, dev, fid, id, OI_CHECK_FLD);
606                 /* XXX: There are four possible cases:
607                  *      1. rc = 0.
608                  *         Backup/restore caused the OI invalid.
609                  *      2. rc = 0.
610                  *         Someone unlinked the object but NOT removed
611                  *         the OI mapping, such as mount target device
612                  *         as ldiskfs, and modify something directly.
613                  *      3. rc = -ENOENT.
614                  *         Someone just removed the object between the
615                  *         former oi_lookup and the iget. It is normal.
616                  *      4. Other failure cases.
617                  *
618                  *      Generally, when the device is mounted, it will
619                  *      auto check whether the system is restored from
620                  *      file-level backup or not. We trust such detect
621                  *      to distinguish the 1st case from the 2nd case:
622                  *      if the OI files are consistent but may contain
623                  *      stale OI mappings because of case 2, if iget()
624                  *      returns -ENOENT or -ESTALE, then it should be
625                  *      the case 2. */
626                 if (rc != 0)
627                         /* If the OI mapping was in OI file before the
628                          * osd_iget_check(), but now, it is disappear,
629                          * then it must be removed by race. That is a
630                          * normal race case. */
631                         GOTO(put, rc);
632
633                 /* It is the OI scrub updated the OI mapping by race.
634                  * The new OI mapping must be valid. */
635                 if (saved_ino != id->oii_ino ||
636                     (saved_gen != id->oii_gen && saved_gen != OSD_OII_NOGEN)) {
637                         if (!IS_ERR(inode))
638                                 iput(inode);
639
640                         trusted = true;
641                         goto again;
642                 }
643
644                 if (IS_ERR(inode)) {
645                         if (dev->od_scrub.os_scrub.os_file.sf_flags &
646                             SF_INCONSISTENT)
647                                 /* It still can be the case 2, but we cannot
648                                  * distinguish it from the case 1. So return
649                                  * -EREMCHG to block current operation until
650                                  *  OI scrub rebuilt the OI mappings. */
651                                 rc = -EREMCHG;
652                         else
653                                 rc = -ENOENT;
654
655                         GOTO(put, rc);
656                 }
657
658                 if (inode->i_generation == id->oii_gen)
659                         rc = -ENOENT;
660                 else
661                         rc = -EREMCHG;
662         } else {
663                 if (id->oii_gen == OSD_OII_NOGEN)
664                         osd_id_gen(id, inode->i_ino, inode->i_generation);
665
666                 /* Do not update file c/mtime in ldiskfs.
667                  * NB: we don't have any lock to protect this because we don't
668                  * have reference on osd_object now, but contention with
669                  * another lookup + attr_set can't happen in the tiny window
670                  * between if (...) and set S_NOCMTIME. */
671                 if (!(inode->i_flags & S_NOCMTIME))
672                         inode->i_flags |= S_NOCMTIME;
673         }
674
675         GOTO(put, rc);
676
677 put:
678         if (rc != 0) {
679                 if (!IS_ERR(inode))
680                         iput(inode);
681
682                 inode = ERR_PTR(rc);
683         }
684
685         return inode;
686 }
687
688 /**
689  * \retval +v: new filter_fid, does not contain self-fid
690  * \retval 0:  filter_fid_old, contains self-fid
691  * \retval -v: other failure cases
692  */
693 int osd_get_idif(struct osd_thread_info *info, struct inode *inode,
694                  struct dentry *dentry, struct lu_fid *fid)
695 {
696         struct filter_fid_old   *ff     = &info->oti_ff;
697         struct ost_id           *ostid  = &info->oti_ostid;
698         int                      rc;
699
700         rc = __osd_xattr_get(inode, dentry, XATTR_NAME_FID, ff, sizeof(*ff));
701         if (rc == sizeof(*ff)) {
702                 rc = 0;
703                 ostid_set_seq(ostid, le64_to_cpu(ff->ff_seq));
704                 rc = ostid_set_id(ostid, le64_to_cpu(ff->ff_objid));
705                 /*
706                  * XXX: use 0 as the index for compatibility, the caller will
707                  *      handle index related issues when necessary.
708                  */
709                 if (!rc)
710                         ostid_to_fid(fid, ostid, 0);
711         } else if (rc == sizeof(struct filter_fid)) {
712                 rc = 1;
713         } else if (rc >= 0) {
714                 rc = -EINVAL;
715         }
716
717         return rc;
718 }
719
720 static int osd_lma_self_repair(struct osd_thread_info *info,
721                                struct osd_device *osd, struct inode *inode,
722                                const struct lu_fid *fid, __u32 compat)
723 {
724         handle_t *jh;
725         int       rc;
726
727         LASSERT(current->journal_info == NULL);
728
729         jh = osd_journal_start_sb(osd_sb(osd), LDISKFS_HT_MISC,
730                                   osd_dto_credits_noquota[DTO_XATTR_SET]);
731         if (IS_ERR(jh)) {
732                 rc = PTR_ERR(jh);
733                 CWARN("%s: cannot start journal for lma_self_repair: rc = %d\n",
734                       osd_name(osd), rc);
735                 return rc;
736         }
737
738         rc = osd_ea_fid_set(info, inode, fid, compat, 0);
739         if (rc != 0)
740                 CWARN("%s: cannot self repair the LMA: rc = %d\n",
741                       osd_name(osd), rc);
742         ldiskfs_journal_stop(jh);
743         return rc;
744 }
745
746 static int osd_check_lma(const struct lu_env *env, struct osd_object *obj)
747 {
748         struct osd_thread_info  *info   = osd_oti_get(env);
749         struct osd_device       *osd    = osd_obj2dev(obj);
750         struct lustre_ost_attrs *loa    = &info->oti_ost_attrs;
751         struct lustre_mdt_attrs *lma    = &loa->loa_lma;
752         struct inode            *inode  = obj->oo_inode;
753         struct dentry           *dentry = &info->oti_obj_dentry;
754         struct lu_fid           *fid    = NULL;
755         const struct lu_fid     *rfid   = lu_object_fid(&obj->oo_dt.do_lu);
756         int                      rc;
757         ENTRY;
758
759         rc = __osd_xattr_get(inode, dentry, XATTR_NAME_LMA,
760                              (void *)loa, sizeof(*loa));
761         if (rc == -ENODATA && !fid_is_igif(rfid) && osd->od_check_ff) {
762                 fid = &lma->lma_self_fid;
763                 rc = osd_get_idif(info, inode, dentry, fid);
764                 if ((rc > 0) || (rc == -ENODATA && osd->od_index_in_idif)) {
765                         /* For the given OST-object, if it has neither LMA nor
766                          * FID in XATTR_NAME_FID, then the given FID (which is
767                          * contained in the @obj, from client RPC for locating
768                          * the OST-object) is trusted. We use it to generate
769                          * the LMA. */
770                         osd_lma_self_repair(info, osd, inode, rfid,
771                                             LMAC_FID_ON_OST);
772                         RETURN(0);
773                 }
774         }
775
776         if (rc < 0)
777                 RETURN(rc);
778
779         if (rc > 0) {
780                 rc = 0;
781                 lustre_lma_swab(lma);
782                 if (unlikely((lma->lma_incompat & ~LMA_INCOMPAT_SUPP) ||
783                              (CFS_FAIL_CHECK(OBD_FAIL_OSD_LMA_INCOMPAT) &&
784                               S_ISREG(inode->i_mode)))) {
785                         CWARN("%s: unsupported incompat LMA feature(s) %#x for "
786                               "fid = "DFID", ino = %lu\n", osd_name(osd),
787                               lma->lma_incompat & ~LMA_INCOMPAT_SUPP,
788                               PFID(rfid), inode->i_ino);
789                         rc = -EOPNOTSUPP;
790                 } else {
791                         fid = &lma->lma_self_fid;
792                         if (lma->lma_compat & LMAC_STRIPE_INFO &&
793                             osd->od_is_ost)
794                                 obj->oo_pfid_in_lma = 1;
795                         if (unlikely(lma->lma_incompat & LMAI_REMOTE_PARENT) &&
796                             !osd->od_is_ost)
797                                 lu_object_set_agent_entry(&obj->oo_dt.do_lu);
798                 }
799         }
800
801         if (fid != NULL && unlikely(!lu_fid_eq(rfid, fid))) {
802                 if (fid_is_idif(rfid) && fid_is_idif(fid)) {
803                         struct ost_id   *oi   = &info->oti_ostid;
804                         struct lu_fid   *fid1 = &info->oti_fid3;
805                         __u32            idx  = fid_idif_ost_idx(rfid);
806
807                         /* For old IDIF, the OST index is not part of the IDIF,
808                          * Means that different OSTs may have the same IDIFs.
809                          * Under such case, we need to make some compatible
810                          * check to make sure to trigger OI scrub properly. */
811                         if (idx != 0 && fid_idif_ost_idx(fid) == 0) {
812                                 /* Given @rfid is new, LMA is old. */
813                                 fid_to_ostid(fid, oi);
814                                 ostid_to_fid(fid1, oi, idx);
815                                 if (lu_fid_eq(fid1, rfid)) {
816                                         if (osd->od_index_in_idif)
817                                                 osd_lma_self_repair(info, osd,
818                                                         inode, rfid,
819                                                         LMAC_FID_ON_OST);
820                                         RETURN(0);
821                                 }
822                         }
823                 }
824
825                 rc = -EREMCHG;
826         }
827
828         RETURN(rc);
829 }
830
831 struct osd_check_lmv_buf {
832 #ifdef HAVE_DIR_CONTEXT
833         /* please keep it as first member */
834         struct dir_context       ctx;
835 #endif
836         struct osd_thread_info  *oclb_info;
837         struct osd_device       *oclb_dev;
838         struct osd_idmap_cache  *oclb_oic;
839         int                      oclb_items;
840         bool                     oclb_found;
841 };
842
843 /**
844  * It is called internally by ->readdir() to filter out the
845  * local slave object's FID of the striped directory.
846  *
847  * \retval      1 found the local slave's FID
848  * \retval      0 continue to check next item
849  * \retval      -ve for failure
850  */
851 #ifdef HAVE_FILLDIR_USE_CTX
852 static int osd_stripe_dir_filldir(struct dir_context *buf,
853 #else
854 static int osd_stripe_dir_filldir(void *buf,
855 #endif
856                                   const char *name, int namelen,
857                                   loff_t offset, __u64 ino, unsigned d_type)
858 {
859         struct osd_check_lmv_buf *oclb = (struct osd_check_lmv_buf *)buf;
860         struct osd_thread_info *oti = oclb->oclb_info;
861         struct lu_fid *fid = &oti->oti_fid3;
862         struct osd_inode_id *id = &oti->oti_id3;
863         struct osd_device *dev = oclb->oclb_dev;
864         struct osd_idmap_cache *oic = oclb->oclb_oic;
865         struct inode *inode;
866
867         oclb->oclb_items++;
868
869         if (name[0] == '.')
870                 return 0;
871
872         fid_zero(fid);
873         sscanf(name + 1, SFID, RFID(fid));
874         if (!fid_is_sane(fid))
875                 return 0;
876
877         if (osd_remote_fid(oti->oti_env, dev, fid))
878                 return 0;
879
880         osd_id_gen(id, ino, OSD_OII_NOGEN);
881         inode = osd_iget(oti, dev, id);
882         if (IS_ERR(inode))
883                 return PTR_ERR(inode);
884
885         iput(inode);
886         osd_add_oi_cache(oti, dev, id, fid);
887         oic->oic_fid = *fid;
888         oic->oic_lid = *id;
889         oic->oic_dev = dev;
890         osd_oii_insert(dev, oic, true);
891         oclb->oclb_found = true;
892
893         return 1;
894 }
895
896 /* When lookup item under striped directory, we need to locate the master
897  * MDT-object of the striped directory firstly, then the client will send
898  * lookup (getattr_by_name) RPC to the MDT with some slave MDT-object's FID
899  * and the item's name. If the system is restored from MDT file level backup,
900  * then before the OI scrub completely built the OI files, the OI mappings of
901  * the master MDT-object and slave MDT-object may be invalid. Usually, it is
902  * not a problem for the master MDT-object. Because when locate the master
903  * MDT-object, we will do name based lookup (for the striped directory itself)
904  * firstly, during such process we can setup the correct OI mapping for the
905  * master MDT-object. But it will be trouble for the slave MDT-object. Because
906  * the client will not trigger name based lookup on the MDT to locate the slave
907  * MDT-object before locating item under the striped directory, then when
908  * osd_fid_lookup(), it will find that the OI mapping for the slave MDT-object
909  * is invalid and does not know what the right OI mapping is, then the MDT has
910  * to return -EINPROGRESS to the client to notify that the OI scrub is rebuiding
911  * the OI file, related OI mapping is unknown yet, please try again later. And
912  * then client will re-try the RPC again and again until related OI mapping has
913  * been updated. That is quite inefficient.
914  *
915  * To resolve above trouble, we will handle it as the following two cases:
916  *
917  * 1) The slave MDT-object and the master MDT-object are on different MDTs.
918  *    It is relative easy. Be as one of remote MDT-objects, the slave MDT-object
919  *    is linked under /REMOTE_PARENT_DIR with the name of its FID string.
920  *    We can locate the slave MDT-object via lookup the /REMOTE_PARENT_DIR
921  *    directly. Please check osd_fid_lookup().
922  *
923  * 2) The slave MDT-object and the master MDT-object reside on the same MDT.
924  *    Under such case, during lookup the master MDT-object, we will lookup the
925  *    slave MDT-object via readdir against the master MDT-object, because the
926  *    slave MDT-objects information are stored as sub-directories with the name
927  *    "${FID}:${index}". Then when find the local slave MDT-object, its OI
928  *    mapping will be recorded. Then subsequent osd_fid_lookup() will know
929  *    the correct OI mapping for the slave MDT-object. */
930 static int osd_check_lmv(struct osd_thread_info *oti, struct osd_device *dev,
931                          struct inode *inode, struct osd_idmap_cache *oic)
932 {
933         struct lu_buf *buf = &oti->oti_big_buf;
934         struct dentry *dentry = &oti->oti_obj_dentry;
935         struct file *filp = &oti->oti_file;
936         const struct file_operations *fops;
937         struct lmv_mds_md_v1 *lmv1;
938         struct osd_check_lmv_buf oclb = {
939 #ifdef HAVE_DIR_CONTEXT
940                 .ctx.actor = osd_stripe_dir_filldir,
941 #endif
942                 .oclb_info = oti,
943                 .oclb_dev = dev,
944                 .oclb_oic = oic,
945                 .oclb_found = false,
946         };
947         int rc = 0;
948         ENTRY;
949
950 again:
951         rc = __osd_xattr_get(inode, dentry, XATTR_NAME_LMV, buf->lb_buf,
952                              buf->lb_len);
953         if (rc == -ERANGE) {
954                 rc = __osd_xattr_get(inode, dentry, XATTR_NAME_LMV, NULL, 0);
955                 if (rc > 0) {
956                         lu_buf_realloc(buf, rc);
957                         if (buf->lb_buf == NULL)
958                                 GOTO(out, rc = -ENOMEM);
959
960                         goto again;
961                 }
962         }
963
964         if (unlikely(rc == 0 || rc == -ENODATA))
965                 GOTO(out, rc = 0);
966
967         if (rc < 0)
968                 GOTO(out, rc);
969
970         if (unlikely(buf->lb_buf == NULL)) {
971                 lu_buf_realloc(buf, rc);
972                 if (buf->lb_buf == NULL)
973                         GOTO(out, rc = -ENOMEM);
974
975                 goto again;
976         }
977
978         lmv1 = buf->lb_buf;
979         if (le32_to_cpu(lmv1->lmv_magic) != LMV_MAGIC_V1)
980                 GOTO(out, rc = 0);
981
982         fops = inode->i_fop;
983         dentry->d_inode = inode;
984         dentry->d_sb = inode->i_sb;
985         filp->f_pos = 0;
986         filp->f_path.dentry = dentry;
987         filp->f_mode = FMODE_64BITHASH;
988         filp->f_mapping = inode->i_mapping;
989         filp->f_op = fops;
990         filp->private_data = NULL;
991         set_file_inode(filp, inode);
992
993         do {
994                 oclb.oclb_items = 0;
995 #ifdef HAVE_DIR_CONTEXT
996                 oclb.ctx.pos = filp->f_pos;
997 #ifdef HAVE_ITERATE_SHARED
998                 rc = fops->iterate_shared(filp, &oclb.ctx);
999 #else
1000                 rc = fops->iterate(filp, &oclb.ctx);
1001 #endif
1002                 filp->f_pos = oclb.ctx.pos;
1003 #else
1004                 rc = fops->readdir(filp, &oclb, osd_stripe_dir_filldir);
1005 #endif
1006         } while (rc >= 0 && oclb.oclb_items > 0 && !oclb.oclb_found &&
1007                  filp->f_pos != LDISKFS_HTREE_EOF_64BIT);
1008         fops->release(inode, filp);
1009
1010 out:
1011         if (rc < 0)
1012                 CDEBUG(D_LFSCK, "%s: fail to check LMV EA, inode = %lu/%u,"
1013                        DFID": rc = %d\n", osd_ino2name(inode),
1014                        inode->i_ino, inode->i_generation,
1015                        PFID(&oic->oic_fid), rc);
1016         else
1017                 rc = 0;
1018
1019         RETURN(rc);
1020 }
1021
1022 static int osd_fid_lookup(const struct lu_env *env, struct osd_object *obj,
1023                           const struct lu_fid *fid,
1024                           const struct lu_object_conf *conf)
1025 {
1026         struct osd_thread_info *info;
1027         struct lu_device *ldev = obj->oo_dt.do_lu.lo_dev;
1028         struct osd_device *dev;
1029         struct osd_idmap_cache *oic;
1030         struct osd_inode_id *id;
1031         struct inode *inode = NULL;
1032         struct lustre_scrub *scrub;
1033         struct scrub_file *sf;
1034         __u32 flags = SS_CLEAR_DRYRUN | SS_CLEAR_FAILOUT | SS_AUTO_FULL;
1035         __u32 saved_ino;
1036         __u32 saved_gen;
1037         int result = 0;
1038         int rc1 = 0;
1039         bool remote = false;
1040         bool trusted = true;
1041         bool updated = false;
1042         bool checked = false;
1043         ENTRY;
1044
1045         LINVRNT(osd_invariant(obj));
1046         LASSERT(obj->oo_inode == NULL);
1047         LASSERTF(fid_is_sane(fid) || fid_is_idif(fid), DFID"\n", PFID(fid));
1048
1049         dev = osd_dev(ldev);
1050         scrub = &dev->od_scrub.os_scrub;
1051         sf = &scrub->os_file;
1052         info = osd_oti_get(env);
1053         LASSERT(info);
1054         oic = &info->oti_cache;
1055
1056         if (OBD_FAIL_CHECK(OBD_FAIL_SRV_ENOENT))
1057                 RETURN(-ENOENT);
1058
1059         /* For the object is created as locking anchor, or for the object to
1060          * be created on disk. No need to osd_oi_lookup() at here because FID
1061          * shouldn't never be re-used, if it's really a duplicate FID from
1062          * unexpected reason, we should be able to detect it later by calling
1063          * do_create->osd_oi_insert(). */
1064         if (conf && conf->loc_flags & LOC_F_NEW)
1065                 GOTO(out, result = 0);
1066
1067         /* Search order: 1. per-thread cache. */
1068         if (lu_fid_eq(fid, &oic->oic_fid) && likely(oic->oic_dev == dev)) {
1069                 id = &oic->oic_lid;
1070                 goto iget;
1071         }
1072
1073         id = &info->oti_id;
1074         if (!list_empty(&scrub->os_inconsistent_items)) {
1075                 /* Search order: 2. OI scrub pending list. */
1076                 result = osd_oii_lookup(dev, fid, id);
1077                 if (!result)
1078                         goto iget;
1079         }
1080
1081         /* The OI mapping in the OI file can be updated by the OI scrub
1082          * when we locate the inode via FID. So it may be not trustable. */
1083         trusted = false;
1084
1085         /* Search order: 3. OI files. */
1086         result = osd_oi_lookup(info, dev, fid, id, OI_CHECK_FLD);
1087         if (result == -ENOENT) {
1088                 if (!(fid_is_norm(fid) || fid_is_igif(fid)) ||
1089                     fid_is_on_ost(info, dev, fid, OI_CHECK_FLD) ||
1090                     !ldiskfs_test_bit(osd_oi_fid2idx(dev,fid),
1091                                       sf->sf_oi_bitmap))
1092                         GOTO(out, result = 0);
1093
1094                 goto trigger;
1095         }
1096
1097         if (result)
1098                 GOTO(out, result);
1099
1100 iget:
1101         obj->oo_inode = NULL;
1102         /* for later passes through checks, not true on first pass */
1103         if (!IS_ERR_OR_NULL(inode))
1104                 iput(inode);
1105
1106         inode = osd_iget_check(info, dev, fid, id, trusted);
1107         if (!IS_ERR(inode)) {
1108                 obj->oo_inode = inode;
1109                 result = 0;
1110                 if (remote)
1111                         goto trigger;
1112
1113                 goto check_lma;
1114         }
1115
1116         result = PTR_ERR(inode);
1117         if (result == -ENOENT || result == -ESTALE)
1118                 GOTO(out, result = 0);
1119
1120         if (result != -EREMCHG)
1121                 GOTO(out, result);
1122
1123 trigger:
1124         /* We still have chance to get the valid inode: for the
1125          * object which is referenced by remote name entry, the
1126          * object on the local MDT will be linked under the dir
1127          * of "/REMOTE_PARENT_DIR" with its FID string as name.
1128          *
1129          * We do not know whether the object for the given FID
1130          * is referenced by some remote name entry or not, and
1131          * especially for DNE II, a multiple-linked object may
1132          * have many name entries reside on many MDTs.
1133          *
1134          * To simplify the operation, OSD will not distinguish
1135          * more, just lookup "/REMOTE_PARENT_DIR". Usually, it
1136          * only happened for the RPC from other MDT during the
1137          * OI scrub, or for the client side RPC with FID only,
1138          * such as FID to path, or from old connected client. */
1139         if (!remote) {
1140                 rc1 = osd_lookup_in_remote_parent(info, dev, fid, id);
1141                 if (!rc1) {
1142                         remote = true;
1143                         trusted = true;
1144                         flags |= SS_AUTO_PARTIAL;
1145                         flags &= ~SS_AUTO_FULL;
1146                         goto iget;
1147                 }
1148         }
1149
1150         if (thread_is_running(&scrub->os_thread)) {
1151                 if (scrub->os_partial_scan && !scrub->os_in_join)
1152                         goto join;
1153
1154                 if (IS_ERR_OR_NULL(inode) || result)
1155                         GOTO(out, result = -EINPROGRESS);
1156
1157                 LASSERT(remote);
1158                 LASSERT(obj->oo_inode == inode);
1159
1160                 osd_add_oi_cache(info, dev, id, fid);
1161                 osd_oii_insert(dev, oic, true);
1162                 goto found;
1163         }
1164
1165         if (dev->od_auto_scrub_interval == AS_NEVER) {
1166                 if (!remote)
1167                         GOTO(out, result = -EREMCHG);
1168
1169                 LASSERT(!result);
1170                 LASSERT(obj->oo_inode == inode);
1171
1172                 osd_add_oi_cache(info, dev, id, fid);
1173                 goto found;
1174         }
1175
1176 join:
1177         rc1 = osd_scrub_start(env, dev, flags);
1178         LCONSOLE_WARN("%s: trigger OI scrub by RPC for the " DFID" with flags "
1179                       "0x%x, rc = %d\n", osd_name(dev), PFID(fid), flags, rc1);
1180         if (rc1 && rc1 != -EALREADY)
1181                 GOTO(out, result = -EREMCHG);
1182
1183         if (IS_ERR_OR_NULL(inode) || result)
1184                 GOTO(out, result = -EINPROGRESS);
1185
1186         LASSERT(remote);
1187         LASSERT(obj->oo_inode == inode);
1188
1189         osd_add_oi_cache(info, dev, id, fid);
1190         osd_oii_insert(dev, oic, true);
1191         goto found;
1192
1193 check_lma:
1194         checked = true;
1195         if (unlikely(obj->oo_header))
1196                 goto found;
1197
1198         result = osd_check_lma(env, obj);
1199         if (!result)
1200                 goto found;
1201
1202         LASSERTF(id->oii_ino == inode->i_ino &&
1203                  id->oii_gen == inode->i_generation,
1204                  "locate wrong inode for FID: "DFID", %u/%u => %ld/%u\n",
1205                  PFID(fid), id->oii_ino, id->oii_gen,
1206                  inode->i_ino, inode->i_generation);
1207
1208         saved_ino = inode->i_ino;
1209         saved_gen = inode->i_generation;
1210
1211         if (unlikely(result == -ENODATA)) {
1212                 /* If the OI scrub updated the OI mapping by race, it
1213                  * must be valid. Trust the inode that has no LMA EA. */
1214                 if (updated)
1215                         goto found;
1216
1217                 result = osd_oi_lookup(info, dev, fid, id, OI_CHECK_FLD);
1218                 if (!result) {
1219                         /* The OI mapping is still there, the inode is still
1220                          * valid. It is just becaues the inode has no LMA EA. */
1221                         if (saved_ino == id->oii_ino &&
1222                             saved_gen == id->oii_gen)
1223                                 goto found;
1224
1225                         /* It is the OI scrub updated the OI mapping by race.
1226                          * The new OI mapping must be valid. */
1227                         trusted = true;
1228                         updated = true;
1229                         goto iget;
1230                 }
1231
1232                 /* "result == -ENOENT" means that the OI mappinghas been
1233                  * removed by race, so the inode belongs to other object.
1234                  *
1235                  * Others error can be returned  directly. */
1236                 if (result == -ENOENT) {
1237                         LASSERT(trusted);
1238
1239                         obj->oo_inode = NULL;
1240                         result = 0;
1241                 }
1242         }
1243
1244         if (result != -EREMCHG)
1245                 GOTO(out, result);
1246
1247         LASSERT(!updated);
1248
1249         result = osd_oi_lookup(info, dev, fid, id, OI_CHECK_FLD);
1250         /* "result == -ENOENT" means the cached OI mapping has been removed
1251          * from the OI file by race, above inode belongs to other object. */
1252         if (result == -ENOENT) {
1253                 LASSERT(trusted);
1254
1255                 obj->oo_inode = NULL;
1256                 GOTO(out, result = 0);
1257         }
1258
1259         if (result)
1260                 GOTO(out, result);
1261
1262         if (saved_ino == id->oii_ino && saved_gen == id->oii_gen) {
1263                 result = -EREMCHG;
1264                 goto trigger;
1265         }
1266
1267         /* It is the OI scrub updated the OI mapping by race.
1268          * The new OI mapping must be valid. */
1269         trusted = true;
1270         updated = true;
1271         goto iget;
1272
1273 found:
1274         if (!checked) {
1275                 struct lustre_ost_attrs *loa = &info->oti_ost_attrs;
1276                 struct lustre_mdt_attrs *lma = &info->oti_ost_attrs.loa_lma;
1277
1278                 result = osd_get_lma(info, inode, &info->oti_obj_dentry, loa);
1279                 if (!result) {
1280                         if (lma->lma_compat & LMAC_STRIPE_INFO &&
1281                             dev->od_is_ost)
1282                                 obj->oo_pfid_in_lma = 1;
1283                         if (unlikely(lma->lma_incompat & LMAI_REMOTE_PARENT) &&
1284                             !dev->od_is_ost)
1285                                 lu_object_set_agent_entry(&obj->oo_dt.do_lu);
1286                 } else if (result != -ENODATA) {
1287                         GOTO(out, result);
1288                 }
1289         }
1290
1291         obj->oo_compat_dot_created = 1;
1292         obj->oo_compat_dotdot_created = 1;
1293
1294         if (S_ISDIR(inode->i_mode) &&
1295             (flags & SS_AUTO_PARTIAL || sf->sf_status == SS_SCANNING))
1296                 osd_check_lmv(info, dev, inode, oic);
1297
1298         result = osd_attach_jinode(inode);
1299         if (result)
1300                 GOTO(out, result);
1301
1302         if (!ldiskfs_pdo)
1303                 GOTO(out, result = 0);
1304
1305         LASSERT(!obj->oo_hl_head);
1306         obj->oo_hl_head = ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
1307
1308         GOTO(out, result = (!obj->oo_hl_head ? -ENOMEM : 0));
1309
1310 out:
1311         if (result || !obj->oo_inode) {
1312                 if (!IS_ERR_OR_NULL(inode))
1313                         iput(inode);
1314
1315                 obj->oo_inode = NULL;
1316                 if (trusted)
1317                         fid_zero(&oic->oic_fid);
1318         }
1319
1320         LINVRNT(osd_invariant(obj));
1321         return result;
1322 }
1323
1324 /*
1325  * Concurrency: shouldn't matter.
1326  */
1327 static void osd_object_init0(struct osd_object *obj)
1328 {
1329         LASSERT(obj->oo_inode != NULL);
1330         obj->oo_dt.do_body_ops = &osd_body_ops;
1331         obj->oo_dt.do_lu.lo_header->loh_attr |=
1332                 (LOHA_EXISTS | (obj->oo_inode->i_mode & S_IFMT));
1333 }
1334
1335 /*
1336  * Concurrency: no concurrent access is possible that early in object
1337  * life-cycle.
1338  */
1339 static int osd_object_init(const struct lu_env *env, struct lu_object *l,
1340                            const struct lu_object_conf *conf)
1341 {
1342         struct osd_object *obj = osd_obj(l);
1343         int result;
1344
1345         LINVRNT(osd_invariant(obj));
1346
1347         if (fid_is_otable_it(&l->lo_header->loh_fid)) {
1348                 obj->oo_dt.do_ops = &osd_obj_otable_it_ops;
1349                 l->lo_header->loh_attr |= LOHA_EXISTS;
1350                 return 0;
1351         }
1352
1353         result = osd_fid_lookup(env, obj, lu_object_fid(l), conf);
1354         obj->oo_dt.do_body_ops = &osd_body_ops_new;
1355         if (result == 0 && obj->oo_inode != NULL) {
1356                 struct osd_thread_info *oti = osd_oti_get(env);
1357                 struct lustre_ost_attrs *loa = &oti->oti_ost_attrs;
1358
1359                 osd_object_init0(obj);
1360                 if (unlikely(obj->oo_header))
1361                         return 0;
1362
1363                 result = osd_get_lma(oti, obj->oo_inode,
1364                                      &oti->oti_obj_dentry, loa);
1365                 if (!result) {
1366                         /* Convert LMAI flags to lustre LMA flags
1367                          * and cache it to oo_lma_flags */
1368                         obj->oo_lma_flags =
1369                                 lma_to_lustre_flags(loa->loa_lma.lma_incompat);
1370                 } else if (result == -ENODATA) {
1371                         result = 0;
1372                 }
1373         }
1374
1375         LINVRNT(osd_invariant(obj));
1376         return result;
1377 }
1378
1379 /* The first part of oxe_buf is xattr name, and is '\0' terminated.
1380  * The left part is for value, binary mode. */
1381 struct osd_xattr_entry {
1382         struct list_head        oxe_list;
1383         size_t                  oxe_len;
1384         size_t                  oxe_namelen;
1385         bool                    oxe_exist;
1386         struct rcu_head         oxe_rcu;
1387         char                    oxe_buf[0];
1388 };
1389
1390 static int osd_oxc_get(struct osd_object *obj, const char *name,
1391                        struct lu_buf *buf)
1392 {
1393         struct osd_xattr_entry *tmp;
1394         struct osd_xattr_entry *oxe = NULL;
1395         size_t namelen = strlen(name);
1396         int rc;
1397         ENTRY;
1398
1399         rcu_read_lock();
1400         list_for_each_entry_rcu(tmp, &obj->oo_xattr_list, oxe_list) {
1401                 if (namelen == tmp->oxe_namelen &&
1402                     strncmp(name, tmp->oxe_buf, namelen) == 0) {
1403                         oxe = tmp;
1404                         break;
1405                 }
1406         }
1407
1408         if (oxe == NULL)
1409                 GOTO(out, rc = -ENOENT);
1410
1411         if (!oxe->oxe_exist)
1412                 GOTO(out, rc = -ENODATA);
1413
1414         /* vallen */
1415         rc = oxe->oxe_len - sizeof(*oxe) - oxe->oxe_namelen - 1;
1416         LASSERT(rc > 0);
1417
1418         if (buf->lb_buf == NULL)
1419                 GOTO(out, rc);
1420
1421         if (buf->lb_len < rc)
1422                 GOTO(out, rc = -ERANGE);
1423
1424         memcpy(buf->lb_buf, &oxe->oxe_buf[namelen + 1], rc);
1425         EXIT;
1426 out:
1427         rcu_read_unlock();
1428
1429         return rc;
1430 }
1431
1432 static void osd_oxc_free(struct rcu_head *head)
1433 {
1434         struct osd_xattr_entry *oxe;
1435
1436         oxe = container_of(head, struct osd_xattr_entry, oxe_rcu);
1437         OBD_FREE(oxe, oxe->oxe_len);
1438 }
1439
1440 static void osd_oxc_add(struct osd_object *obj, const char *name,
1441                         const char *buf, int buflen)
1442 {
1443         struct osd_xattr_entry *oxe;
1444         struct osd_xattr_entry *old = NULL;
1445         struct osd_xattr_entry *tmp;
1446         size_t namelen = strlen(name);
1447         size_t len = sizeof(*oxe) + namelen + 1 + buflen;
1448
1449         OBD_ALLOC(oxe, len);
1450         if (oxe == NULL)
1451                 return;
1452
1453         INIT_LIST_HEAD(&oxe->oxe_list);
1454         oxe->oxe_len = len;
1455         oxe->oxe_namelen = namelen;
1456         memcpy(oxe->oxe_buf, name, namelen);
1457         if (buflen > 0) {
1458                 LASSERT(buf != NULL);
1459                 memcpy(oxe->oxe_buf + namelen + 1, buf, buflen);
1460                 oxe->oxe_exist = true;
1461         } else {
1462                 oxe->oxe_exist = false;
1463         }
1464
1465         /* this should be rarely called, just remove old and add new */
1466         spin_lock(&obj->oo_guard);
1467         list_for_each_entry(tmp, &obj->oo_xattr_list, oxe_list) {
1468                 if (namelen == tmp->oxe_namelen &&
1469                     strncmp(name, tmp->oxe_buf, namelen) == 0) {
1470                         old = tmp;
1471                         break;
1472                 }
1473         }
1474         if (old != NULL) {
1475                 list_replace_rcu(&old->oxe_list, &oxe->oxe_list);
1476                 call_rcu(&old->oxe_rcu, osd_oxc_free);
1477         } else {
1478                 list_add_tail_rcu(&oxe->oxe_list, &obj->oo_xattr_list);
1479         }
1480         spin_unlock(&obj->oo_guard);
1481 }
1482
1483 static void osd_oxc_del(struct osd_object *obj, const char *name)
1484 {
1485         struct osd_xattr_entry *oxe;
1486         size_t namelen = strlen(name);
1487
1488         spin_lock(&obj->oo_guard);
1489         list_for_each_entry(oxe, &obj->oo_xattr_list, oxe_list) {
1490                 if (namelen == oxe->oxe_namelen &&
1491                     strncmp(name, oxe->oxe_buf, namelen) == 0) {
1492                         list_del_rcu(&oxe->oxe_list);
1493                         call_rcu(&oxe->oxe_rcu, osd_oxc_free);
1494                         break;
1495                 }
1496         }
1497         spin_unlock(&obj->oo_guard);
1498 }
1499
1500 static void osd_oxc_fini(struct osd_object *obj)
1501 {
1502         struct osd_xattr_entry *oxe, *next;
1503
1504         list_for_each_entry_safe(oxe, next, &obj->oo_xattr_list, oxe_list) {
1505                 list_del(&oxe->oxe_list);
1506                 OBD_FREE(oxe, oxe->oxe_len);
1507         }
1508 }
1509
1510 /*
1511  * Concurrency: no concurrent access is possible that late in object
1512  * life-cycle.
1513  */
1514 static void osd_object_free(const struct lu_env *env, struct lu_object *l)
1515 {
1516         struct osd_object *obj = osd_obj(l);
1517         struct lu_object_header *h = obj->oo_header;
1518
1519         LINVRNT(osd_invariant(obj));
1520
1521         osd_oxc_fini(obj);
1522         dt_object_fini(&obj->oo_dt);
1523         if (obj->oo_hl_head != NULL)
1524                 ldiskfs_htree_lock_head_free(obj->oo_hl_head);
1525         OBD_FREE_PTR(obj);
1526         if (unlikely(h)) {
1527                 lu_object_header_fini(h);
1528                 OBD_FREE_PTR(h);
1529         }
1530 }
1531
1532 /*
1533  * Concurrency: no concurrent access is possible that late in object
1534  * life-cycle.
1535  */
1536 static void osd_index_fini(struct osd_object *o)
1537 {
1538         struct iam_container *bag;
1539
1540         if (o->oo_dir != NULL) {
1541                 bag = &o->oo_dir->od_container;
1542                 if (o->oo_inode != NULL) {
1543                         if (bag->ic_object == o->oo_inode)
1544                                 iam_container_fini(bag);
1545                 }
1546                 OBD_FREE_PTR(o->oo_dir);
1547                 o->oo_dir = NULL;
1548         }
1549 }
1550
1551 /*
1552  * Concurrency: no concurrent access is possible that late in object
1553  * life-cycle (for all existing callers, that is. New callers have to provide
1554  * their own locking.)
1555  */
1556 static int osd_inode_unlinked(const struct inode *inode)
1557 {
1558         return inode->i_nlink == 0;
1559 }
1560
1561 enum {
1562         OSD_TXN_OI_DELETE_CREDITS    = 20,
1563         OSD_TXN_INODE_DELETE_CREDITS = 20
1564 };
1565
1566 /*
1567  * Journal
1568  */
1569
1570 #if OSD_THANDLE_STATS
1571 /**
1572  * Set time when the handle is allocated
1573  */
1574 static void osd_th_alloced(struct osd_thandle *oth)
1575 {
1576         oth->oth_alloced = ktime_get();
1577 }
1578
1579 /**
1580  * Set time when the handle started
1581  */
1582 static void osd_th_started(struct osd_thandle *oth)
1583 {
1584         oth->oth_started = ktime_get();
1585 }
1586
1587 /**
1588  * Check whether the we deal with this handle for too long.
1589  */
1590 static void __osd_th_check_slow(void *oth, struct osd_device *dev,
1591                                 ktime_t alloced, ktime_t started,
1592                                 ktime_t closed)
1593 {
1594         ktime_t now = ktime_get();
1595
1596         LASSERT(dev != NULL);
1597
1598         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_STARTING,
1599                             ktime_us_delta(started, alloced));
1600         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_OPEN,
1601                             ktime_us_delta(closed, started));
1602         lprocfs_counter_add(dev->od_stats, LPROC_OSD_THANDLE_CLOSING,
1603                             ktime_us_delta(now, closed));
1604
1605         if (ktime_before(ktime_add_ns(alloced, 30 * NSEC_PER_SEC), now)) {
1606                 CWARN("transaction handle %p was open for too long: now %lld, alloced %lld, started %lld, closed %lld\n",
1607                       oth, now, alloced, started, closed);
1608                 libcfs_debug_dumpstack(NULL);
1609         }
1610 }
1611
1612 #define OSD_CHECK_SLOW_TH(oth, dev, expr)                               \
1613 {                                                                       \
1614         ktime_t __closed = ktime_get();                                 \
1615         ktime_t __alloced = oth->oth_alloced;                           \
1616         ktime_t __started = oth->oth_started;                           \
1617                                                                         \
1618         expr;                                                           \
1619         __osd_th_check_slow(oth, dev, __alloced, __started, __closed);  \
1620 }
1621
1622 #else /* OSD_THANDLE_STATS */
1623
1624 #define osd_th_alloced(h)                  do {} while(0)
1625 #define osd_th_started(h)                  do {} while(0)
1626 #define OSD_CHECK_SLOW_TH(oth, dev, expr)  expr
1627
1628 #endif /* OSD_THANDLE_STATS */
1629
1630 /*
1631  * Concurrency: doesn't access mutable data.
1632  */
1633 static int osd_param_is_not_sane(const struct osd_device *dev,
1634                                  const struct thandle *th)
1635 {
1636         struct osd_thandle *oh = container_of(th, typeof(*oh), ot_super);
1637
1638         return oh->ot_credits > osd_transaction_size(dev);
1639 }
1640
1641 /*
1642  * Concurrency: shouldn't matter.
1643  */
1644 static void osd_trans_commit_cb(struct super_block *sb,
1645                                 struct ldiskfs_journal_cb_entry *jcb, int error)
1646 {
1647         struct osd_thandle *oh = container_of0(jcb, struct osd_thandle, ot_jcb);
1648         struct thandle     *th  = &oh->ot_super;
1649         struct lu_device   *lud = &th->th_dev->dd_lu_dev;
1650         struct dt_txn_commit_cb *dcb, *tmp;
1651
1652         LASSERT(oh->ot_handle == NULL);
1653
1654         if (error)
1655                 CERROR("transaction @0x%p commit error: %d\n", th, error);
1656
1657         dt_txn_hook_commit(th);
1658
1659         /* call per-transaction callbacks if any */
1660         list_for_each_entry_safe(dcb, tmp, &oh->ot_commit_dcb_list,
1661                                  dcb_linkage) {
1662                 LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
1663                          "commit callback entry: magic=%x name='%s'\n",
1664                          dcb->dcb_magic, dcb->dcb_name);
1665                 list_del_init(&dcb->dcb_linkage);
1666                 dcb->dcb_func(NULL, th, dcb, error);
1667         }
1668
1669         lu_ref_del_at(&lud->ld_reference, &oh->ot_dev_link, "osd-tx", th);
1670         lu_device_put(lud);
1671         th->th_dev = NULL;
1672
1673         OBD_FREE_PTR(oh);
1674 }
1675
1676 #ifndef HAVE_SB_START_WRITE
1677 # define sb_start_write(sb) do {} while (0)
1678 # define sb_end_write(sb) do {} while (0)
1679 #endif
1680
1681 static struct thandle *osd_trans_create(const struct lu_env *env,
1682                                         struct dt_device *d)
1683 {
1684         struct osd_thread_info  *oti = osd_oti_get(env);
1685         struct osd_iobuf        *iobuf = &oti->oti_iobuf;
1686         struct osd_thandle      *oh;
1687         struct thandle          *th;
1688         ENTRY;
1689
1690         if (d->dd_rdonly) {
1691                 CERROR("%s: someone try to start transaction under "
1692                        "readonly mode, should be disabled.\n",
1693                        osd_name(osd_dt_dev(d)));
1694                 dump_stack();
1695                 RETURN(ERR_PTR(-EROFS));
1696         }
1697
1698         /* on pending IO in this thread should left from prev. request */
1699         LASSERT(atomic_read(&iobuf->dr_numreqs) == 0);
1700
1701         sb_start_write(osd_sb(osd_dt_dev(d)));
1702
1703         OBD_ALLOC_GFP(oh, sizeof *oh, GFP_NOFS);
1704         if (!oh) {
1705                 sb_end_write(osd_sb(osd_dt_dev(d)));
1706                 RETURN(ERR_PTR(-ENOMEM));
1707         }
1708
1709         oh->ot_quota_trans = &oti->oti_quota_trans;
1710         memset(oh->ot_quota_trans, 0, sizeof(*oh->ot_quota_trans));
1711         th = &oh->ot_super;
1712         th->th_dev = d;
1713         th->th_result = 0;
1714         oh->ot_credits = 0;
1715         INIT_LIST_HEAD(&oh->ot_commit_dcb_list);
1716         INIT_LIST_HEAD(&oh->ot_stop_dcb_list);
1717         INIT_LIST_HEAD(&oh->ot_trunc_locks);
1718         osd_th_alloced(oh);
1719
1720         memset(oti->oti_declare_ops, 0,
1721                sizeof(oti->oti_declare_ops));
1722         memset(oti->oti_declare_ops_cred, 0,
1723                sizeof(oti->oti_declare_ops_cred));
1724         memset(oti->oti_declare_ops_used, 0,
1725                sizeof(oti->oti_declare_ops_used));
1726
1727         oti->oti_ins_cache_depth++;
1728
1729         RETURN(th);
1730 }
1731
1732 void osd_trans_dump_creds(const struct lu_env *env, struct thandle *th)
1733 {
1734         struct osd_thread_info  *oti = osd_oti_get(env);
1735         struct osd_thandle      *oh;
1736
1737         oh = container_of0(th, struct osd_thandle, ot_super);
1738         LASSERT(oh != NULL);
1739
1740         CWARN("  create: %u/%u/%u, destroy: %u/%u/%u\n",
1741               oti->oti_declare_ops[OSD_OT_CREATE],
1742               oti->oti_declare_ops_cred[OSD_OT_CREATE],
1743               oti->oti_declare_ops_used[OSD_OT_CREATE],
1744               oti->oti_declare_ops[OSD_OT_DESTROY],
1745               oti->oti_declare_ops_cred[OSD_OT_DESTROY],
1746               oti->oti_declare_ops_used[OSD_OT_DESTROY]);
1747         CWARN("  attr_set: %u/%u/%u, xattr_set: %u/%u/%u\n",
1748               oti->oti_declare_ops[OSD_OT_ATTR_SET],
1749               oti->oti_declare_ops_cred[OSD_OT_ATTR_SET],
1750               oti->oti_declare_ops_used[OSD_OT_ATTR_SET],
1751               oti->oti_declare_ops[OSD_OT_XATTR_SET],
1752               oti->oti_declare_ops_cred[OSD_OT_XATTR_SET],
1753               oti->oti_declare_ops_used[OSD_OT_XATTR_SET]);
1754         CWARN("  write: %u/%u/%u, punch: %u/%u/%u, quota %u/%u/%u\n",
1755               oti->oti_declare_ops[OSD_OT_WRITE],
1756               oti->oti_declare_ops_cred[OSD_OT_WRITE],
1757               oti->oti_declare_ops_used[OSD_OT_WRITE],
1758               oti->oti_declare_ops[OSD_OT_PUNCH],
1759               oti->oti_declare_ops_cred[OSD_OT_PUNCH],
1760               oti->oti_declare_ops_used[OSD_OT_PUNCH],
1761               oti->oti_declare_ops[OSD_OT_QUOTA],
1762               oti->oti_declare_ops_cred[OSD_OT_QUOTA],
1763               oti->oti_declare_ops_used[OSD_OT_QUOTA]);
1764         CWARN("  insert: %u/%u/%u, delete: %u/%u/%u\n",
1765               oti->oti_declare_ops[OSD_OT_INSERT],
1766               oti->oti_declare_ops_cred[OSD_OT_INSERT],
1767               oti->oti_declare_ops_used[OSD_OT_INSERT],
1768               oti->oti_declare_ops[OSD_OT_DELETE],
1769               oti->oti_declare_ops_cred[OSD_OT_DELETE],
1770               oti->oti_declare_ops_used[OSD_OT_DELETE]);
1771         CWARN("  ref_add: %u/%u/%u, ref_del: %u/%u/%u\n",
1772               oti->oti_declare_ops[OSD_OT_REF_ADD],
1773               oti->oti_declare_ops_cred[OSD_OT_REF_ADD],
1774               oti->oti_declare_ops_used[OSD_OT_REF_ADD],
1775               oti->oti_declare_ops[OSD_OT_REF_DEL],
1776               oti->oti_declare_ops_cred[OSD_OT_REF_DEL],
1777               oti->oti_declare_ops_used[OSD_OT_REF_DEL]);
1778 }
1779
1780 /*
1781  * Concurrency: shouldn't matter.
1782  */
1783 static int osd_trans_start(const struct lu_env *env, struct dt_device *d,
1784                            struct thandle *th)
1785 {
1786         struct osd_thread_info *oti = osd_oti_get(env);
1787         struct osd_device  *dev = osd_dt_dev(d);
1788         handle_t           *jh;
1789         struct osd_thandle *oh;
1790         int rc;
1791
1792         ENTRY;
1793
1794         LASSERT(current->journal_info == NULL);
1795
1796         oh = container_of0(th, struct osd_thandle, ot_super);
1797         LASSERT(oh != NULL);
1798         LASSERT(oh->ot_handle == NULL);
1799
1800         rc = dt_txn_hook_start(env, d, th);
1801         if (rc != 0)
1802                 GOTO(out, rc);
1803
1804         if (unlikely(osd_param_is_not_sane(dev, th))) {
1805                 static unsigned long last_printed;
1806                 static int last_credits;
1807
1808                 /* don't make noise on a tiny testing systems
1809                  * actual credits misuse will be caught anyway */
1810                 if (last_credits != oh->ot_credits &&
1811                     time_after(jiffies, last_printed +
1812                                msecs_to_jiffies(60 * MSEC_PER_SEC)) &&
1813                     osd_transaction_size(dev) > 512) {
1814                         CWARN("%s: credits %u > trans_max %u\n", osd_name(dev),
1815                               oh->ot_credits, osd_transaction_size(dev));
1816                         osd_trans_dump_creds(env, th);
1817                         libcfs_debug_dumpstack(NULL);
1818                         last_credits = oh->ot_credits;
1819                         last_printed = jiffies;
1820                 }
1821                 /* XXX Limit the credits to 'max_transaction_buffers', and
1822                  *     let the underlying filesystem to catch the error if
1823                  *     we really need so many credits.
1824                  *
1825                  *     This should be removed when we can calculate the
1826                  *     credits precisely. */
1827                 oh->ot_credits = osd_transaction_size(dev);
1828         } else if (ldiskfs_track_declares_assert != 0) {
1829                 /* reserve few credits to prevent an assertion in JBD
1830                  * our debugging mechanism will be able to detected
1831                  * overuse. this can help to debug single-update
1832                  * transactions */
1833                 oh->ot_credits += 10;
1834                 if (unlikely(osd_param_is_not_sane(dev, th)))
1835                         oh->ot_credits = osd_transaction_size(dev);
1836         }
1837
1838         /*
1839          * XXX temporary stuff. Some abstraction layer should
1840          * be used.
1841          */
1842         jh = osd_journal_start_sb(osd_sb(dev), LDISKFS_HT_MISC, oh->ot_credits);
1843         osd_th_started(oh);
1844         if (!IS_ERR(jh)) {
1845                 oh->ot_handle = jh;
1846                 LASSERT(oti->oti_txns == 0);
1847
1848                 lu_device_get(&d->dd_lu_dev);
1849                 lu_ref_add_at(&d->dd_lu_dev.ld_reference, &oh->ot_dev_link,
1850                               "osd-tx", th);
1851                 oti->oti_txns++;
1852                 rc = 0;
1853         } else {
1854                 rc = PTR_ERR(jh);
1855         }
1856 out:
1857         RETURN(rc);
1858 }
1859
1860 static int osd_seq_exists(const struct lu_env *env,
1861                           struct osd_device *osd, u64 seq)
1862 {
1863         struct lu_seq_range     *range = &osd_oti_get(env)->oti_seq_range;
1864         struct seq_server_site  *ss = osd_seq_site(osd);
1865         int                     rc;
1866         ENTRY;
1867
1868         LASSERT(ss != NULL);
1869         LASSERT(ss->ss_server_fld != NULL);
1870
1871         rc = osd_fld_lookup(env, osd, seq, range);
1872         if (rc != 0) {
1873                 if (rc != -ENOENT)
1874                         CERROR("%s: can't lookup FLD sequence %#llx: rc = %d\n",
1875                                osd_name(osd), seq, rc);
1876                 RETURN(0);
1877         }
1878
1879         RETURN(ss->ss_node_id == range->lsr_index);
1880 }
1881
1882 static void osd_trans_stop_cb(struct osd_thandle *oth, int result)
1883 {
1884         struct dt_txn_commit_cb *dcb;
1885         struct dt_txn_commit_cb *tmp;
1886
1887         /* call per-transaction stop callbacks if any */
1888         list_for_each_entry_safe(dcb, tmp, &oth->ot_stop_dcb_list,
1889                                  dcb_linkage) {
1890                 LASSERTF(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC,
1891                          "commit callback entry: magic=%x name='%s'\n",
1892                          dcb->dcb_magic, dcb->dcb_name);
1893                 list_del_init(&dcb->dcb_linkage);
1894                 dcb->dcb_func(NULL, &oth->ot_super, dcb, result);
1895         }
1896 }
1897
1898 /*
1899  * Concurrency: shouldn't matter.
1900  */
1901 static int osd_trans_stop(const struct lu_env *env, struct dt_device *dt,
1902                           struct thandle *th)
1903 {
1904         struct osd_thread_info *oti = osd_oti_get(env);
1905         struct osd_thandle *oh;
1906         struct osd_iobuf *iobuf = &oti->oti_iobuf;
1907         struct osd_device *osd = osd_dt_dev(th->th_dev);
1908         struct qsd_instance *qsd = osd->od_quota_slave;
1909         struct lquota_trans *qtrans;
1910         struct list_head truncates = LIST_HEAD_INIT(truncates);
1911         int rc = 0, remove_agents = 0;
1912         ENTRY;
1913
1914         oh = container_of0(th, struct osd_thandle, ot_super);
1915
1916         remove_agents = oh->ot_remove_agents;
1917
1918         qtrans = oh->ot_quota_trans;
1919         oh->ot_quota_trans = NULL;
1920
1921         /* move locks to local list, stop tx, execute truncates */
1922         list_splice(&oh->ot_trunc_locks, &truncates);
1923
1924         if (oh->ot_handle != NULL) {
1925                 int rc2;
1926                 handle_t *hdl = oh->ot_handle;
1927
1928                 /*
1929                  * add commit callback
1930                  * notice we don't do this in osd_trans_start()
1931                  * as underlying transaction can change during truncate
1932                  */
1933                 ldiskfs_journal_callback_add(hdl, osd_trans_commit_cb,
1934                                          &oh->ot_jcb);
1935
1936                 LASSERT(oti->oti_txns == 1);
1937                 oti->oti_txns--;
1938
1939                 rc = dt_txn_hook_stop(env, th);
1940                 if (rc != 0)
1941                         CERROR("%s: failed in transaction hook: rc = %d\n",
1942                                osd_name(osd), rc);
1943
1944                 osd_trans_stop_cb(oh, rc);
1945                 /* hook functions might modify th_sync */
1946                 hdl->h_sync = th->th_sync;
1947
1948                 oh->ot_handle = NULL;
1949                 OSD_CHECK_SLOW_TH(oh, osd, rc2 = ldiskfs_journal_stop(hdl));
1950                 if (rc2 != 0)
1951                         CERROR("%s: failed to stop transaction: rc = %d\n",
1952                                osd_name(osd), rc2);
1953                 if (!rc)
1954                         rc = rc2;
1955
1956                 osd_process_truncates(&truncates);
1957         } else {
1958                 osd_trans_stop_cb(oh, th->th_result);
1959                 OBD_FREE_PTR(oh);
1960         }
1961
1962         osd_trunc_unlock_all(&truncates);
1963
1964         /* inform the quota slave device that the transaction is stopping */
1965         qsd_op_end(env, qsd, qtrans);
1966
1967         /* as we want IO to journal and data IO be concurrent, we don't block
1968          * awaiting data IO completion in osd_do_bio(), instead we wait here
1969          * once transaction is submitted to the journal. all reqular requests
1970          * don't do direct IO (except read/write), thus this wait_event becomes
1971          * no-op for them.
1972          *
1973          * IMPORTANT: we have to wait till any IO submited by the thread is
1974          * completed otherwise iobuf may be corrupted by different request
1975          */
1976         wait_event(iobuf->dr_wait,
1977                        atomic_read(&iobuf->dr_numreqs) == 0);
1978         osd_fini_iobuf(osd, iobuf);
1979         if (!rc)
1980                 rc = iobuf->dr_error;
1981
1982         if (unlikely(remove_agents != 0))
1983                 osd_process_scheduled_agent_removals(env, osd);
1984
1985         oti->oti_ins_cache_depth--;
1986         /* reset OI cache for safety */
1987         if (oti->oti_ins_cache_depth == 0)
1988                 oti->oti_ins_cache_used = 0;
1989
1990         sb_end_write(osd_sb(osd));
1991
1992         RETURN(rc);
1993 }
1994
1995 static int osd_trans_cb_add(struct thandle *th, struct dt_txn_commit_cb *dcb)
1996 {
1997         struct osd_thandle *oh = container_of0(th, struct osd_thandle,
1998                                                ot_super);
1999
2000         LASSERT(dcb->dcb_magic == TRANS_COMMIT_CB_MAGIC);
2001         LASSERT(&dcb->dcb_func != NULL);
2002         if (dcb->dcb_flags & DCB_TRANS_STOP)
2003                 list_add(&dcb->dcb_linkage, &oh->ot_stop_dcb_list);
2004         else
2005                 list_add(&dcb->dcb_linkage, &oh->ot_commit_dcb_list);
2006
2007         return 0;
2008 }
2009
2010 /*
2011  * Called just before object is freed. Releases all resources except for
2012  * object itself (that is released by osd_object_free()).
2013  *
2014  * Concurrency: no concurrent access is possible that late in object
2015  * life-cycle.
2016  */
2017 static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
2018 {
2019         struct osd_object *obj   = osd_obj(l);
2020         struct inode      *inode = obj->oo_inode;
2021
2022         LINVRNT(osd_invariant(obj));
2023
2024         /*
2025          * If object is unlinked remove fid->ino mapping from object index.
2026          */
2027
2028         osd_index_fini(obj);
2029         if (inode != NULL) {
2030                 struct qsd_instance     *qsd = osd_obj2dev(obj)->od_quota_slave;
2031                 qid_t                    uid = i_uid_read(inode);
2032                 qid_t                    gid = i_gid_read(inode);
2033
2034                 obj->oo_inode = NULL;
2035                 iput(inode);
2036                 if (!obj->oo_header && qsd) {
2037                         struct osd_thread_info  *info = osd_oti_get(env);
2038                         struct lquota_id_info   *qi = &info->oti_qi;
2039
2040                         /* Release granted quota to master if necessary */
2041                         qi->lqi_id.qid_uid = uid;
2042                         qsd_op_adjust(env, qsd, &qi->lqi_id, USRQUOTA);
2043
2044                         qi->lqi_id.qid_uid = gid;
2045                         qsd_op_adjust(env, qsd, &qi->lqi_id, GRPQUOTA);
2046
2047                         qi->lqi_id.qid_uid = i_projid_read(inode);
2048                         qsd_op_adjust(env, qsd, &qi->lqi_id, PRJQUOTA);
2049                 }
2050         }
2051 }
2052
2053 /*
2054  * Concurrency: ->loo_object_release() is called under site spin-lock.
2055  */
2056 static void osd_object_release(const struct lu_env *env,
2057                                struct lu_object *l)
2058 {
2059         struct osd_object *o = osd_obj(l);
2060         /* nobody should be releasing a non-destroyed object with nlink=0
2061          * the API allows this, but ldiskfs doesn't like and then report
2062          * this inode as deleted */
2063         if (unlikely(!o->oo_destroyed && o->oo_inode && o->oo_inode->i_nlink == 0))
2064                 LBUG();
2065 }
2066
2067 /*
2068  * Concurrency: shouldn't matter.
2069  */
2070 static int osd_object_print(const struct lu_env *env, void *cookie,
2071                             lu_printer_t p, const struct lu_object *l)
2072 {
2073         struct osd_object *o = osd_obj(l);
2074         struct iam_descr  *d;
2075
2076         if (o->oo_dir != NULL)
2077                 d = o->oo_dir->od_container.ic_descr;
2078         else
2079                 d = NULL;
2080         return (*p)(env, cookie,
2081                     LUSTRE_OSD_LDISKFS_NAME"-object@%p(i:%p:%lu/%u)[%s]",
2082                     o, o->oo_inode,
2083                     o->oo_inode ? o->oo_inode->i_ino : 0UL,
2084                     o->oo_inode ? o->oo_inode->i_generation : 0,
2085                     d ? d->id_ops->id_name : "plain");
2086 }
2087
2088 /*
2089  * Concurrency: shouldn't matter.
2090  */
2091 int osd_statfs(const struct lu_env *env, struct dt_device *d,
2092                struct obd_statfs *sfs)
2093 {
2094         struct osd_device       *osd = osd_dt_dev(d);
2095         struct super_block      *sb = osd_sb(osd);
2096         struct kstatfs          *ksfs;
2097         __u64                    reserved;
2098         int                      result = 0;
2099
2100         if (unlikely(osd->od_mnt == NULL))
2101                 return -EINPROGRESS;
2102
2103         /* osd_lproc.c call this without env, allocate ksfs for that case */
2104         if (unlikely(env == NULL)) {
2105                 OBD_ALLOC_PTR(ksfs);
2106                 if (ksfs == NULL)
2107                         return -ENOMEM;
2108         } else {
2109                 ksfs = &osd_oti_get(env)->oti_ksfs;
2110         }
2111
2112         result = sb->s_op->statfs(sb->s_root, ksfs);
2113         if (result)
2114                 goto out;
2115
2116         statfs_pack(sfs, ksfs);
2117         if (unlikely(sb->s_flags & MS_RDONLY))
2118                 sfs->os_state |= OS_STATE_READONLY;
2119         if (ldiskfs_has_feature_extents(sb))
2120                 sfs->os_maxbytes = sb->s_maxbytes;
2121         else
2122                 sfs->os_maxbytes = LDISKFS_SB(sb)->s_bitmap_maxbytes;
2123
2124         /*
2125          * Reserve some space so to avoid fragmenting the filesystem too much.
2126          * Fragmentation not only impacts performance, but can also increase
2127          * metadata overhead significantly, causing grant calculation to be
2128          * wrong.
2129          *
2130          * Reserve 0.78% of total space, at least 8MB for small filesystems.
2131          */
2132         CLASSERT(OSD_STATFS_RESERVED > LDISKFS_MAX_BLOCK_SIZE);
2133         reserved = OSD_STATFS_RESERVED >> sb->s_blocksize_bits;
2134         if (likely(sfs->os_blocks >= reserved << OSD_STATFS_RESERVED_SHIFT))
2135                 reserved = sfs->os_blocks >> OSD_STATFS_RESERVED_SHIFT;
2136
2137         sfs->os_blocks -= reserved;
2138         sfs->os_bfree  -= min(reserved, sfs->os_bfree);
2139         sfs->os_bavail -= min(reserved, sfs->os_bavail);
2140
2141 out:
2142         if (unlikely(env == NULL))
2143                 OBD_FREE_PTR(ksfs);
2144         return result;
2145 }
2146
2147 /**
2148  * Estimate space needed for file creations. We assume the largest filename
2149  * which is 2^64 - 1, hence a filename of 20 chars.
2150  * This is 28 bytes per object which is 28MB for 1M objects ... no so bad.
2151  */
2152 #ifdef __LDISKFS_DIR_REC_LEN
2153 #define PER_OBJ_USAGE __LDISKFS_DIR_REC_LEN(20)
2154 #else
2155 #define PER_OBJ_USAGE LDISKFS_DIR_REC_LEN(20)
2156 #endif
2157
2158 /*
2159  * Concurrency: doesn't access mutable data.
2160  */
2161 static void osd_conf_get(const struct lu_env *env,
2162                          const struct dt_device *dev,
2163                          struct dt_device_param *param)
2164 {
2165         struct osd_device *d = osd_dt_dev(dev);
2166         struct super_block *sb = osd_sb(d);
2167         struct blk_integrity *bi = bdev_get_integrity(sb->s_bdev);
2168         const char *name;
2169         int ea_overhead;
2170
2171         /*
2172          * XXX should be taken from not-yet-existing fs abstraction layer.
2173          */
2174         param->ddp_max_name_len = LDISKFS_NAME_LEN;
2175         param->ddp_max_nlink    = LDISKFS_LINK_MAX;
2176         param->ddp_symlink_max  = sb->s_blocksize;
2177         param->ddp_mount_type     = LDD_MT_LDISKFS;
2178         if (ldiskfs_has_feature_extents(sb))
2179                 param->ddp_maxbytes = sb->s_maxbytes;
2180         else
2181                 param->ddp_maxbytes = LDISKFS_SB(sb)->s_bitmap_maxbytes;
2182         /* inode are statically allocated, so per-inode space consumption
2183          * is the space consumed by the directory entry */
2184         param->ddp_inodespace     = PER_OBJ_USAGE;
2185         /* EXT_INIT_MAX_LEN is the theoretical maximum extent size  (32k blocks
2186          * = 128MB) which is unlikely to be hit in real life. Report a smaller
2187          * maximum length to not under count the actual number of extents
2188          * needed for writing a file. */
2189         param->ddp_max_extent_blks = EXT_INIT_MAX_LEN >> 2;
2190         /* worst-case extent insertion metadata overhead */
2191         param->ddp_extent_tax = 6 * LDISKFS_BLOCK_SIZE(sb);
2192         param->ddp_mntopts      = 0;
2193         if (test_opt(sb, XATTR_USER))
2194                 param->ddp_mntopts |= MNTOPT_USERXATTR;
2195         if (test_opt(sb, POSIX_ACL))
2196                 param->ddp_mntopts |= MNTOPT_ACL;
2197
2198         /* LOD might calculate the max stripe count based on max_ea_size,
2199          * so we need take account in the overhead as well,
2200          * xattr_header + magic + xattr_entry_head */
2201         ea_overhead = sizeof(struct ldiskfs_xattr_header) + sizeof(__u32) +
2202                       LDISKFS_XATTR_LEN(XATTR_NAME_MAX_LEN);
2203
2204 #if defined(LDISKFS_FEATURE_INCOMPAT_EA_INODE)
2205         if (ldiskfs_has_feature_ea_inode(sb))
2206                 param->ddp_max_ea_size = LDISKFS_XATTR_MAX_LARGE_EA_SIZE -
2207                                                                 ea_overhead;
2208         else
2209 #endif
2210                 param->ddp_max_ea_size = sb->s_blocksize - ea_overhead;
2211
2212         if (param->ddp_max_ea_size > OSD_MAX_EA_SIZE)
2213                 param->ddp_max_ea_size = OSD_MAX_EA_SIZE;
2214
2215         /* Preferred RPC size for efficient disk IO.  4MB shows good
2216          * all-around performance for ldiskfs, but use bigalloc chunk size
2217          * by default if larger. */
2218 #if defined(LDISKFS_CLUSTER_SIZE)
2219         if (LDISKFS_CLUSTER_SIZE(sb) > DT_DEF_BRW_SIZE)
2220                 param->ddp_brw_size = LDISKFS_CLUSTER_SIZE(sb);
2221         else
2222 #endif
2223                 param->ddp_brw_size = DT_DEF_BRW_SIZE;
2224
2225         param->ddp_t10_cksum_type = 0;
2226         if (bi) {
2227                 unsigned short interval = blk_integrity_interval(bi);
2228                 name = blk_integrity_name(bi);
2229                 /*
2230                  * Expected values:
2231                  * T10-DIF-TYPE1-CRC
2232                  * T10-DIF-TYPE3-CRC
2233                  * T10-DIF-TYPE1-IP
2234                  * T10-DIF-TYPE3-IP
2235                  */
2236                 if (strncmp(name, "T10-DIF-TYPE",
2237                             sizeof("T10-DIF-TYPE") - 1) == 0) {
2238                         /* also skip "1/3-" at end */
2239                         const int type_off = sizeof("T10-DIF-TYPE.");
2240
2241                         if (interval != 512 && interval != 4096)
2242                                 CERROR("%s: unsupported T10PI sector size %u\n",
2243                                        d->od_svname, interval);
2244                         else if (strcmp(name + type_off, "CRC") == 0)
2245                                 param->ddp_t10_cksum_type = interval == 512 ?
2246                                         OBD_CKSUM_T10CRC512 :
2247                                         OBD_CKSUM_T10CRC4K;
2248                         else if (strcmp(name + type_off, "IP") == 0)
2249                                 param->ddp_t10_cksum_type = interval == 512 ?
2250                                         OBD_CKSUM_T10IP512 :
2251                                         OBD_CKSUM_T10IP4K;
2252                         else
2253                                 CERROR("%s: unsupported checksum type of "
2254                                        "T10PI type '%s'",
2255                                        d->od_svname, name);
2256                 } else {
2257                         CERROR("%s: unsupported T10PI type '%s'",
2258                                d->od_svname, name);
2259                 }
2260         }
2261 }
2262
2263 /*
2264  * Concurrency: shouldn't matter.
2265  */
2266 static int osd_sync(const struct lu_env *env, struct dt_device *d)
2267 {
2268         int rc;
2269
2270         CDEBUG(D_CACHE, "%s: syncing OSD\n", osd_dt_dev(d)->od_svname);
2271
2272         rc = ldiskfs_force_commit(osd_sb(osd_dt_dev(d)));
2273
2274         CDEBUG(D_CACHE, "%s: synced OSD: rc = %d\n", osd_dt_dev(d)->od_svname,
2275                rc);
2276
2277         return rc;
2278 }
2279
2280 /**
2281  * Start commit for OSD device.
2282  *
2283  * An implementation of dt_commit_async method for OSD device.
2284  * Asychronously starts underlayng fs sync and thereby a transaction
2285  * commit.
2286  *
2287  * \param env environment
2288  * \param d dt device
2289  *
2290  * \see dt_device_operations
2291  */
2292 static int osd_commit_async(const struct lu_env *env,
2293                             struct dt_device *d)
2294 {
2295         struct super_block *s = osd_sb(osd_dt_dev(d));
2296         ENTRY;
2297
2298         CDEBUG(D_HA, "%s: async commit OSD\n", osd_dt_dev(d)->od_svname);
2299         RETURN(s->s_op->sync_fs(s, 0));
2300 }
2301
2302 /* Our own copy of the set readonly functions if present, or NU if not. */
2303 static int (*priv_dev_set_rdonly)(struct block_device *bdev);
2304 static int (*priv_dev_check_rdonly)(struct block_device *bdev);
2305 /* static int (*priv_dev_clear_rdonly)(struct block_device *bdev); */
2306
2307 /*
2308  * Concurrency: shouldn't matter.
2309  */
2310 static int osd_ro(const struct lu_env *env, struct dt_device *d)
2311 {
2312         struct super_block *sb = osd_sb(osd_dt_dev(d));
2313         struct block_device *dev = sb->s_bdev;
2314         int rc = -EOPNOTSUPP;
2315         ENTRY;
2316
2317         if (priv_dev_set_rdonly) {
2318                 struct block_device *jdev = LDISKFS_SB(sb)->journal_bdev;
2319
2320                 rc = 0;
2321                 CERROR("*** setting %s read-only ***\n",
2322                        osd_dt_dev(d)->od_svname);
2323
2324                 if (sb->s_op->freeze_fs) {
2325                         rc = sb->s_op->freeze_fs(sb);
2326                         if (rc)
2327                                 goto out;
2328                 }
2329
2330                 if (jdev && (jdev != dev)) {
2331                         CDEBUG(D_IOCTL | D_HA, "set journal dev %lx rdonly\n",
2332                                (long)jdev);
2333                         priv_dev_set_rdonly(jdev);
2334                 }
2335                 CDEBUG(D_IOCTL | D_HA, "set dev %lx rdonly\n", (long)dev);
2336                 priv_dev_set_rdonly(dev);
2337
2338                 if (sb->s_op->unfreeze_fs)
2339                         sb->s_op->unfreeze_fs(sb);
2340         }
2341
2342 out:
2343         if (rc)
2344                 CERROR("%s: %lx CANNOT BE SET READONLY: rc = %d\n",
2345                        osd_dt_dev(d)->od_svname, (long)dev, rc);
2346
2347         RETURN(rc);
2348 }
2349
2350 /**
2351  * Note: we do not count into QUOTA here.
2352  * If we mount with --data_journal we may need more.
2353  */
2354 const int osd_dto_credits_noquota[DTO_NR] = {
2355         /**
2356          * Insert.
2357          * INDEX_EXTRA_TRANS_BLOCKS(8) +
2358          * SINGLEDATA_TRANS_BLOCKS(8)
2359          * XXX Note: maybe iam need more, since iam have more level than
2360          *           EXT3 htree.
2361          */
2362         [DTO_INDEX_INSERT]  = 16,
2363         /**
2364          * Delete
2365          * just modify a single entry, probably merge few within a block
2366          */
2367         [DTO_INDEX_DELETE]  = 1,
2368         /**
2369          * Used for OI scrub
2370          */
2371         [DTO_INDEX_UPDATE]  = 16,
2372         /**
2373          * 4(inode, inode bits, groups, GDT)
2374          *   notice: OI updates are counted separately with DTO_INDEX_INSERT
2375          */
2376         [DTO_OBJECT_CREATE] = 4,
2377         /**
2378          * 4(inode, inode bits, groups, GDT)
2379          *   notice: OI updates are counted separately with DTO_INDEX_DELETE
2380          */
2381         [DTO_OBJECT_DELETE] = 4,
2382         /**
2383          * Attr set credits (inode)
2384          */
2385         [DTO_ATTR_SET_BASE] = 1,
2386         /**
2387          * Xattr set. The same as xattr of EXT3.
2388          * DATA_TRANS_BLOCKS(14)
2389          * XXX Note: in original MDS implmentation INDEX_EXTRA_TRANS_BLOCKS
2390          * are also counted in. Do not know why?
2391          */
2392         [DTO_XATTR_SET]     = 14,
2393         /**
2394          * credits for inode change during write.
2395          */
2396         [DTO_WRITE_BASE]    = 3,
2397         /**
2398          * credits for single block write.
2399          */
2400         [DTO_WRITE_BLOCK]   = 14,
2401         /**
2402          * Attr set credits for chown.
2403          * This is extra credits for setattr, and it is null without quota
2404          */
2405         [DTO_ATTR_SET_CHOWN] = 0
2406 };
2407
2408 static const struct dt_device_operations osd_dt_ops = {
2409         .dt_root_get       = osd_root_get,
2410         .dt_statfs         = osd_statfs,
2411         .dt_trans_create   = osd_trans_create,
2412         .dt_trans_start    = osd_trans_start,
2413         .dt_trans_stop     = osd_trans_stop,
2414         .dt_trans_cb_add   = osd_trans_cb_add,
2415         .dt_conf_get       = osd_conf_get,
2416         .dt_sync           = osd_sync,
2417         .dt_ro             = osd_ro,
2418         .dt_commit_async   = osd_commit_async,
2419 };
2420
2421 static void osd_read_lock(const struct lu_env *env, struct dt_object *dt,
2422                           unsigned role)
2423 {
2424         struct osd_object *obj = osd_dt_obj(dt);
2425         struct osd_thread_info *oti = osd_oti_get(env);
2426
2427         LINVRNT(osd_invariant(obj));
2428
2429         LASSERT(obj->oo_owner != env);
2430         down_read_nested(&obj->oo_sem, role);
2431
2432         LASSERT(obj->oo_owner == NULL);
2433         oti->oti_r_locks++;
2434 }
2435
2436 static void osd_write_lock(const struct lu_env *env, struct dt_object *dt,
2437                            unsigned role)
2438 {
2439         struct osd_object *obj = osd_dt_obj(dt);
2440         struct osd_thread_info *oti = osd_oti_get(env);
2441
2442         LINVRNT(osd_invariant(obj));
2443
2444         LASSERT(obj->oo_owner != env);
2445         down_write_nested(&obj->oo_sem, role);
2446
2447         LASSERT(obj->oo_owner == NULL);
2448         obj->oo_owner = env;
2449         oti->oti_w_locks++;
2450 }
2451
2452 static void osd_read_unlock(const struct lu_env *env, struct dt_object *dt)
2453 {
2454         struct osd_object *obj = osd_dt_obj(dt);
2455         struct osd_thread_info *oti = osd_oti_get(env);
2456
2457         LINVRNT(osd_invariant(obj));
2458
2459         LASSERT(oti->oti_r_locks > 0);
2460         oti->oti_r_locks--;
2461         up_read(&obj->oo_sem);
2462 }
2463
2464 static void osd_write_unlock(const struct lu_env *env, struct dt_object *dt)
2465 {
2466         struct osd_object *obj = osd_dt_obj(dt);
2467         struct osd_thread_info *oti = osd_oti_get(env);
2468
2469         LINVRNT(osd_invariant(obj));
2470
2471         LASSERT(obj->oo_owner == env);
2472         LASSERT(oti->oti_w_locks > 0);
2473         oti->oti_w_locks--;
2474         obj->oo_owner = NULL;
2475         up_write(&obj->oo_sem);
2476 }
2477
2478 static int osd_write_locked(const struct lu_env *env, struct dt_object *dt)
2479 {
2480         struct osd_object *obj = osd_dt_obj(dt);
2481
2482         LINVRNT(osd_invariant(obj));
2483
2484         return obj->oo_owner == env;
2485 }
2486
2487 static struct timespec *osd_inode_time(const struct lu_env *env,
2488                                        struct inode *inode, __u64 seconds)
2489 {
2490         struct osd_thread_info  *oti = osd_oti_get(env);
2491         struct timespec         *t   = &oti->oti_time;
2492
2493         t->tv_sec = seconds;
2494         t->tv_nsec = 0;
2495         *t = timespec_trunc(*t, inode->i_sb->s_time_gran);
2496         return t;
2497 }
2498
2499 static void osd_inode_getattr(const struct lu_env *env,
2500                               struct inode *inode, struct lu_attr *attr)
2501 {
2502         attr->la_valid  |= LA_ATIME | LA_MTIME | LA_CTIME | LA_MODE |
2503                            LA_SIZE | LA_BLOCKS | LA_UID | LA_GID |
2504                            LA_PROJID | LA_FLAGS | LA_NLINK | LA_RDEV |
2505                            LA_BLKSIZE | LA_TYPE;
2506
2507         attr->la_atime   = LTIME_S(inode->i_atime);
2508         attr->la_mtime   = LTIME_S(inode->i_mtime);
2509         attr->la_ctime   = LTIME_S(inode->i_ctime);
2510         attr->la_mode    = inode->i_mode;
2511         attr->la_size    = i_size_read(inode);
2512         attr->la_blocks  = inode->i_blocks;
2513         attr->la_uid     = i_uid_read(inode);
2514         attr->la_gid     = i_gid_read(inode);
2515         attr->la_projid  = i_projid_read(inode);
2516         attr->la_flags   = ll_inode_to_ext_flags(inode->i_flags);
2517         attr->la_nlink   = inode->i_nlink;
2518         attr->la_rdev    = inode->i_rdev;
2519         attr->la_blksize = 1 << inode->i_blkbits;
2520         attr->la_blkbits = inode->i_blkbits;
2521         /*
2522          * Ext4 did not transfer inherit flags from raw inode
2523          * to inode flags, and ext4 internally test raw inode
2524          * @i_flags directly. Instead of patching ext4, we do it here.
2525          */
2526         if (LDISKFS_I(inode)->i_flags & LUSTRE_PROJINHERIT_FL)
2527                 attr->la_flags |= LUSTRE_PROJINHERIT_FL;
2528 }
2529
2530 static int osd_attr_get(const struct lu_env *env,
2531                         struct dt_object *dt,
2532                         struct lu_attr *attr)
2533 {
2534         struct osd_object *obj = osd_dt_obj(dt);
2535
2536         if (unlikely(!dt_object_exists(dt)))
2537                 return -ENOENT;
2538         if (unlikely(obj->oo_destroyed))
2539                 return -ENOENT;
2540
2541         LASSERT(!dt_object_remote(dt));
2542         LINVRNT(osd_invariant(obj));
2543
2544         spin_lock(&obj->oo_guard);
2545         osd_inode_getattr(env, obj->oo_inode, attr);
2546         if (obj->oo_lma_flags & LUSTRE_ORPHAN_FL)
2547                 attr->la_flags |= LUSTRE_ORPHAN_FL;
2548         spin_unlock(&obj->oo_guard);
2549
2550         return 0;
2551 }
2552
2553 static int osd_declare_attr_qid(const struct lu_env *env,
2554                                 struct osd_object *obj,
2555                                 struct osd_thandle *oh, long long bspace,
2556                                 qid_t old_id, qid_t new_id, bool enforce,
2557                                 unsigned type, bool ignore_edquot)
2558 {
2559         int rc;
2560         struct osd_thread_info *info = osd_oti_get(env);
2561         struct lquota_id_info  *qi = &info->oti_qi;
2562
2563         qi->lqi_type = type;
2564         /* inode accounting */
2565         qi->lqi_is_blk = false;
2566
2567         /* one more inode for the new id ... */
2568         qi->lqi_id.qid_uid = new_id;
2569         qi->lqi_space      = 1;
2570         /* Reserve credits for the new id */
2571         rc = osd_declare_qid(env, oh, qi, NULL, enforce, NULL);
2572         if (ignore_edquot && (rc == -EDQUOT || rc == -EINPROGRESS))
2573                 rc = 0;
2574         if (rc)
2575                 RETURN(rc);
2576
2577         /* and one less inode for the current id */
2578         qi->lqi_id.qid_uid = old_id;
2579         qi->lqi_space      = -1;
2580         rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
2581         if (ignore_edquot && (rc == -EDQUOT || rc == -EINPROGRESS))
2582                 rc = 0;
2583         if (rc)
2584                 RETURN(rc);
2585
2586         /* block accounting */
2587         qi->lqi_is_blk = true;
2588
2589         /* more blocks for the new id ... */
2590         qi->lqi_id.qid_uid = new_id;
2591         qi->lqi_space      = bspace;
2592         /*
2593          * Credits for the new uid has been reserved, re-use "obj"
2594          * to save credit reservation.
2595          */
2596         rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
2597         if (ignore_edquot && (rc == -EDQUOT || rc == -EINPROGRESS))
2598                 rc = 0;
2599         if (rc)
2600                 RETURN(rc);
2601
2602         /* and finally less blocks for the current uid */
2603         qi->lqi_id.qid_uid = old_id;
2604         qi->lqi_space      = -bspace;
2605         rc = osd_declare_qid(env, oh, qi, obj, enforce, NULL);
2606         if (ignore_edquot && (rc == -EDQUOT || rc == -EINPROGRESS))
2607                 rc = 0;
2608
2609         RETURN(rc);
2610 }
2611
2612 static int osd_declare_attr_set(const struct lu_env *env,
2613                                 struct dt_object *dt,
2614                                 const struct lu_attr *attr,
2615                                 struct thandle *handle)
2616 {
2617         struct osd_thandle     *oh;
2618         struct osd_object      *obj;
2619         qid_t                   uid;
2620         qid_t                   gid;
2621         long long               bspace;
2622         int                     rc = 0;
2623         bool                    enforce;
2624         ENTRY;
2625
2626         LASSERT(dt != NULL);
2627         LASSERT(handle != NULL);
2628
2629         obj = osd_dt_obj(dt);
2630         LASSERT(osd_invariant(obj));
2631
2632         oh = container_of0(handle, struct osd_thandle, ot_super);
2633         LASSERT(oh->ot_handle == NULL);
2634
2635         osd_trans_declare_op(env, oh, OSD_OT_ATTR_SET,
2636                              osd_dto_credits_noquota[DTO_ATTR_SET_BASE]);
2637
2638         osd_trans_declare_op(env, oh, OSD_OT_XATTR_SET,
2639                              osd_dto_credits_noquota[DTO_XATTR_SET]);
2640
2641         if (attr == NULL || obj->oo_inode == NULL)
2642                 RETURN(rc);
2643
2644         bspace   = obj->oo_inode->i_blocks << 9;
2645         bspace   = toqb(bspace);
2646
2647         /* Changing ownership is always preformed by super user, it should not
2648          * fail with EDQUOT unless required explicitly.
2649          *
2650          * We still need to call the osd_declare_qid() to calculate the journal
2651          * credits for updating quota accounting files and to trigger quota
2652          * space adjustment once the operation is completed.*/
2653         if (attr->la_valid & LA_UID || attr->la_valid & LA_GID) {
2654                 bool ignore_edquot = !(attr->la_flags & LUSTRE_SET_SYNC_FL);
2655
2656                 if (!ignore_edquot)
2657                         CDEBUG(D_QUOTA, "%s: enforce quota on UID %u, GID %u"
2658                                "(the quota space is %lld)\n",
2659                                obj->oo_inode->i_sb->s_id, attr->la_uid,
2660                                attr->la_gid, bspace);
2661
2662                 /* USERQUOTA */
2663                 uid = i_uid_read(obj->oo_inode);
2664                 enforce = (attr->la_valid & LA_UID) && (attr->la_uid != uid);
2665                 rc = osd_declare_attr_qid(env, obj, oh, bspace, uid,
2666                                           attr->la_uid, enforce, USRQUOTA,
2667                                           true);
2668                 if (rc)
2669                         RETURN(rc);
2670
2671                 gid = i_gid_read(obj->oo_inode);
2672                 enforce = (attr->la_valid & LA_GID) && (attr->la_gid != gid);
2673                 rc = osd_declare_attr_qid(env, obj, oh, bspace,
2674                                           i_gid_read(obj->oo_inode),
2675                                           attr->la_gid, enforce, GRPQUOTA,
2676                                           ignore_edquot);
2677                 if (rc)
2678                         RETURN(rc);
2679
2680         }
2681 #ifdef HAVE_PROJECT_QUOTA
2682         if (attr->la_valid & LA_PROJID) {
2683                 __u32 projid = i_projid_read(obj->oo_inode);
2684                 enforce = (attr->la_valid & LA_PROJID) &&
2685                                         (attr->la_projid != projid);
2686                 rc = osd_declare_attr_qid(env, obj, oh, bspace,
2687                                           (qid_t)projid, (qid_t)attr->la_projid,
2688                                           enforce, PRJQUOTA, true);
2689                 if (rc)
2690                         RETURN(rc);
2691         }
2692 #endif
2693         RETURN(rc);
2694 }
2695
2696 static int osd_inode_setattr(const struct lu_env *env,
2697                              struct inode *inode, const struct lu_attr *attr)
2698 {
2699         __u64 bits = attr->la_valid;
2700
2701         /* Only allow set size for regular file */
2702         if (!S_ISREG(inode->i_mode))
2703                 bits &= ~(LA_SIZE | LA_BLOCKS);
2704
2705         if (bits == 0)
2706                 return 0;
2707
2708         if (bits & LA_ATIME)
2709                 inode->i_atime  = *osd_inode_time(env, inode, attr->la_atime);
2710         if (bits & LA_CTIME)
2711                 inode->i_ctime  = *osd_inode_time(env, inode, attr->la_ctime);
2712         if (bits & LA_MTIME)
2713                 inode->i_mtime  = *osd_inode_time(env, inode, attr->la_mtime);
2714         if (bits & LA_SIZE) {
2715                 spin_lock(&inode->i_lock);
2716                 LDISKFS_I(inode)->i_disksize = attr->la_size;
2717                 i_size_write(inode, attr->la_size);
2718                 spin_unlock(&inode->i_lock);
2719         }
2720
2721         /* OSD should not change "i_blocks" which is used by quota.
2722          * "i_blocks" should be changed by ldiskfs only. */
2723         if (bits & LA_MODE)
2724                 inode->i_mode = (inode->i_mode & S_IFMT) |
2725                                 (attr->la_mode & ~S_IFMT);
2726         if (bits & LA_UID)
2727                 i_uid_write(inode, attr->la_uid);
2728         if (bits & LA_GID)
2729                 i_gid_write(inode, attr->la_gid);
2730         if (bits & LA_PROJID)
2731                 i_projid_write(inode, attr->la_projid);
2732         if (bits & LA_NLINK)
2733                 set_nlink(inode, attr->la_nlink);
2734         if (bits & LA_RDEV)
2735                 inode->i_rdev = attr->la_rdev;
2736
2737         if (bits & LA_FLAGS) {
2738                 /* always keep S_NOCMTIME */
2739                 inode->i_flags = ll_ext_to_inode_flags(attr->la_flags) |
2740                                  S_NOCMTIME;
2741                 /*
2742                  * Ext4 did not transfer inherit flags from
2743                  * @inode->i_flags to raw inode i_flags when writing
2744                  * flags, we do it explictly here.
2745                  */
2746                 if (attr->la_flags & LUSTRE_PROJINHERIT_FL)
2747                         LDISKFS_I(inode)->i_flags |= LUSTRE_PROJINHERIT_FL;
2748                 else
2749                         LDISKFS_I(inode)->i_flags &= ~LUSTRE_PROJINHERIT_FL;
2750         }
2751         return 0;
2752 }
2753
2754 #ifdef HAVE_PROJECT_QUOTA
2755 static int osd_transfer_project(struct inode *inode, __u32 projid)
2756 {
2757         struct super_block *sb = inode->i_sb;
2758         struct ldiskfs_inode_info *ei = LDISKFS_I(inode);
2759         int err;
2760         kprojid_t kprojid;
2761         struct ldiskfs_iloc iloc;
2762         struct ldiskfs_inode *raw_inode;
2763         struct dquot *transfer_to[LDISKFS_MAXQUOTAS] = { };
2764
2765         if (!ldiskfs_has_feature_project(sb)) {
2766                 LASSERT(__kprojid_val(LDISKFS_I(inode)->i_projid)
2767                         == LDISKFS_DEF_PROJID);
2768                 if (projid != LDISKFS_DEF_PROJID)
2769                         return -EOPNOTSUPP;
2770                 else
2771                         return 0;
2772         }
2773
2774         if (LDISKFS_INODE_SIZE(sb) <= LDISKFS_GOOD_OLD_INODE_SIZE)
2775                 return -EOPNOTSUPP;
2776
2777         kprojid = make_kprojid(&init_user_ns, (projid_t)projid);
2778         if (projid_eq(kprojid, LDISKFS_I(inode)->i_projid))
2779                 return 0;
2780
2781         err = ldiskfs_get_inode_loc(inode, &iloc);
2782         if (err)
2783                 return err;
2784
2785         raw_inode = ldiskfs_raw_inode(&iloc);
2786         if (!LDISKFS_FITS_IN_INODE(raw_inode, ei, i_projid)) {
2787                 err = -EOVERFLOW;
2788                 brelse(iloc.bh);
2789                 return err;
2790         }
2791         brelse(iloc.bh);
2792
2793         dquot_initialize(inode);
2794         transfer_to[PRJQUOTA] = dqget(sb, make_kqid_projid(kprojid));
2795         if (transfer_to[PRJQUOTA]) {
2796                 err = __dquot_transfer(inode, transfer_to);
2797                 dqput(transfer_to[PRJQUOTA]);
2798                 if (err)
2799                         return err;
2800         }
2801
2802         return err;
2803 }
2804 #endif
2805
2806 static int osd_quota_transfer(struct inode *inode, const struct lu_attr *attr)
2807 {
2808         int rc;
2809
2810         if ((attr->la_valid & LA_UID && attr->la_uid != i_uid_read(inode)) ||
2811             (attr->la_valid & LA_GID && attr->la_gid != i_gid_read(inode))) {
2812                 struct iattr    iattr;
2813
2814                 ll_vfs_dq_init(inode);
2815                 iattr.ia_valid = 0;
2816                 if (attr->la_valid & LA_UID)
2817                         iattr.ia_valid |= ATTR_UID;
2818                 if (attr->la_valid & LA_GID)
2819                         iattr.ia_valid |= ATTR_GID;
2820                 iattr.ia_uid = make_kuid(&init_user_ns, attr->la_uid);
2821                 iattr.ia_gid = make_kgid(&init_user_ns, attr->la_gid);
2822
2823                 rc = ll_vfs_dq_transfer(inode, &iattr);
2824                 if (rc) {
2825                         CERROR("%s: quota transfer failed: rc = %d. Is quota "
2826                                "enforcement enabled on the ldiskfs "
2827                                "filesystem?\n", inode->i_sb->s_id, rc);
2828                         return rc;
2829                 }
2830         }
2831
2832         /* Handle project id transfer here properly */
2833         if (attr->la_valid & LA_PROJID &&
2834             attr->la_projid != i_projid_read(inode)) {
2835 #ifdef HAVE_PROJECT_QUOTA
2836                 rc = osd_transfer_project(inode, attr->la_projid);
2837 #else
2838                 rc = -ENOTSUPP;
2839 #endif
2840                 if (rc) {
2841                         CERROR("%s: quota transfer failed: rc = %d. Is project "
2842                                "enforcement enabled on the ldiskfs "
2843                                "filesystem?\n", inode->i_sb->s_id, rc);
2844                         return rc;
2845                 }
2846         }
2847         return 0;
2848 }
2849
2850 static int osd_attr_set(const struct lu_env *env,
2851                         struct dt_object *dt,
2852                         const struct lu_attr *attr,
2853                         struct thandle *handle)
2854 {
2855         struct osd_object *obj = osd_dt_obj(dt);
2856         struct inode      *inode;
2857         int rc;
2858
2859         if (!dt_object_exists(dt))
2860                 return -ENOENT;
2861
2862         LASSERT(handle != NULL);
2863         LASSERT(!dt_object_remote(dt));
2864         LASSERT(osd_invariant(obj));
2865
2866         osd_trans_exec_op(env, handle, OSD_OT_ATTR_SET);
2867
2868         if (OBD_FAIL_CHECK(OBD_FAIL_OSD_FID_MAPPING) &&
2869             !osd_obj2dev(obj)->od_is_ost) {
2870                 struct osd_thread_info  *oti  = osd_oti_get(env);
2871                 const struct lu_fid     *fid0 = lu_object_fid(&dt->do_lu);
2872                 struct lu_fid           *fid1 = &oti->oti_fid;
2873                 struct osd_inode_id     *id   = &oti->oti_id;
2874                 struct iam_path_descr   *ipd;
2875                 struct iam_container    *bag;
2876                 struct osd_thandle      *oh;
2877                 int                      rc;
2878
2879                 fid_cpu_to_be(fid1, fid0);
2880                 memset(id, 1, sizeof(*id));
2881                 bag = &osd_fid2oi(osd_dev(dt->do_lu.lo_dev),
2882                                   fid0)->oi_dir.od_container;
2883                 ipd = osd_idx_ipd_get(env, bag);
2884                 if (unlikely(ipd == NULL))
2885                         RETURN(-ENOMEM);
2886
2887                 oh = container_of0(handle, struct osd_thandle, ot_super);
2888                 rc = iam_update(oh->ot_handle, bag, (const struct iam_key *)fid1,
2889                                 (const struct iam_rec *)id, ipd);
2890                 osd_ipd_put(env, bag, ipd);
2891                 return(rc > 0 ? 0 : rc);
2892         }
2893
2894         inode = obj->oo_inode;
2895
2896         rc = osd_quota_transfer(inode, attr);
2897         if (rc)
2898                 return rc;
2899
2900         spin_lock(&obj->oo_guard);
2901         rc = osd_inode_setattr(env, inode, attr);
2902         spin_unlock(&obj->oo_guard);
2903         if (rc != 0)
2904                 GOTO(out, rc);
2905
2906         ll_dirty_inode(inode, I_DIRTY_DATASYNC);
2907
2908         if (!(attr->la_valid & LA_FLAGS))
2909                 GOTO(out, rc);
2910
2911         /* Let's check if there are extra flags need to be set into LMA */
2912         if (attr->la_flags & LUSTRE_LMA_FL_MASKS) {
2913                 struct osd_thread_info *info = osd_oti_get(env);
2914                 struct lustre_mdt_attrs *lma = &info->oti_ost_attrs.loa_lma;
2915
2916                 LASSERT(!obj->oo_pfid_in_lma);
2917
2918                 rc = osd_get_lma(info, inode, &info->oti_obj_dentry,
2919                                  &info->oti_ost_attrs);
2920                 if (rc)
2921                         GOTO(out, rc);
2922
2923                 lma->lma_incompat |=
2924                         lustre_to_lma_flags(attr->la_flags);
2925                 lustre_lma_swab(lma);
2926                 rc = __osd_xattr_set(info, inode, XATTR_NAME_LMA,
2927                                      lma, sizeof(*lma), XATTR_REPLACE);
2928                 if (rc != 0) {
2929                         struct osd_device *osd = osd_obj2dev(obj);
2930
2931                         CWARN("%s: set "DFID" lma flags %u failed: rc = %d\n",
2932                               osd_name(osd), PFID(lu_object_fid(&dt->do_lu)),
2933                               lma->lma_incompat, rc);
2934                 } else {
2935                         obj->oo_lma_flags =
2936                                 attr->la_flags & LUSTRE_LMA_FL_MASKS;
2937                 }
2938                 osd_trans_exec_check(env, handle, OSD_OT_XATTR_SET);
2939         }
2940 out:
2941         osd_trans_exec_check(env, handle, OSD_OT_ATTR_SET);
2942
2943         return rc;
2944 }
2945
2946 static struct dentry *osd_child_dentry_get(const struct lu_env *env,
2947                                            struct osd_object *obj,
2948                                            const char *name, const int namelen)
2949 {
2950         return osd_child_dentry_by_inode(env, obj->oo_inode, name, namelen);
2951 }
2952
2953 static int osd_mkfile(struct osd_thread_info *info, struct osd_object *obj,
2954                       umode_t mode, struct dt_allocation_hint *hint,
2955                       struct thandle *th)
2956 {
2957         int result;
2958         struct osd_device  *osd = osd_obj2dev(obj);
2959         struct osd_thandle *oth;
2960         struct dt_object   *parent = NULL;
2961         struct inode       *inode;
2962
2963         LINVRNT(osd_invariant(obj));
2964         LASSERT(obj->oo_inode == NULL);
2965         LASSERT(obj->oo_hl_head == NULL);
2966
2967         if (S_ISDIR(mode) && ldiskfs_pdo) {
2968                 obj->oo_hl_head =ldiskfs_htree_lock_head_alloc(HTREE_HBITS_DEF);
2969                 if (obj->oo_hl_head == NULL)
2970                         return -ENOMEM;
2971         }
2972
2973         oth = container_of(th, struct osd_thandle, ot_super);
2974         LASSERT(oth->ot_handle->h_transaction != NULL);
2975
2976         if (hint != NULL && hint->dah_parent != NULL &&
2977             !dt_object_remote(hint->dah_parent))
2978                 parent = hint->dah_parent;
2979
2980         inode = ldiskfs_create_inode(oth->ot_handle,
2981                                      parent ? osd_dt_obj(parent)->oo_inode :
2982                                               osd_sb(osd)->s_root->d_inode,
2983                                      mode);
2984         if (!IS_ERR(inode)) {
2985                 /* Do not update file c/mtime in ldiskfs. */
2986                 inode->i_flags |= S_NOCMTIME;
2987
2988                 /* For new created object, it must be consistent,
2989                  * and it is unnecessary to scrub against it. */
2990                 ldiskfs_set_inode_state(inode, LDISKFS_STATE_LUSTRE_NOSCRUB);
2991
2992                 obj->oo_inode = inode;
2993                 result = 0;
2994         } else {
2995                 if (obj->oo_hl_head != NULL) {
2996                         ldiskfs_htree_lock_head_free(obj->oo_hl_head);
2997                         obj->oo_hl_head = NULL;
2998                 }
2999                 result = PTR_ERR(inode);
3000         }
3001         LINVRNT(osd_invariant(obj));
3002         return result;
3003 }
3004
3005 enum {
3006         OSD_NAME_LEN = 255
3007 };
3008
3009 static int osd_mkdir(struct osd_thread_info *info, struct osd_object *obj,
3010                      struct lu_attr *attr,
3011                      struct dt_allocation_hint *hint,
3012                      struct dt_object_format *dof,
3013                      struct thandle *th)
3014 {
3015         int result;
3016         struct osd_thandle *oth;
3017         __u32 mode = (attr->la_mode & (S_IFMT | S_IRWXUGO | S_ISVTX | S_ISGID));
3018
3019         LASSERT(S_ISDIR(attr->la_mode));
3020
3021         oth = container_of(th, struct osd_thandle, ot_super);
3022         LASSERT(oth->ot_handle->h_transaction != NULL);
3023         result = osd_mkfile(info, obj, mode, hint, th);
3024
3025         return result;
3026 }
3027
3028 static int osd_mk_index(struct osd_thread_info *info, struct osd_object *obj,
3029                         struct lu_attr *attr,
3030                         struct dt_allocation_hint *hint,
3031                         struct dt_object_format *dof,
3032                         struct thandle *th)
3033 {
3034         int result;
3035         struct osd_thandle *oth;
3036         const struct dt_index_features *feat = dof->u.dof_idx.di_feat;
3037
3038         __u32 mode = (attr->la_mode & (S_IFMT | S_IALLUGO | S_ISVTX));
3039
3040         LASSERT(S_ISREG(attr->la_mode));
3041
3042         oth = container_of(th, struct osd_thandle, ot_super);
3043         LASSERT(oth->ot_handle->h_transaction != NULL);
3044
3045         result = osd_mkfile(info, obj, mode, hint, th);
3046         if (result == 0) {
3047                 LASSERT(obj->oo_inode != NULL);
3048                 if (feat->dif_flags & DT_IND_VARKEY)
3049                         result = iam_lvar_create(obj->oo_inode,
3050                                                  feat->dif_keysize_max,
3051                                                  feat->dif_ptrsize,
3052                                                  feat->dif_recsize_max,
3053                                                  oth->ot_handle);
3054                 else
3055                         result = iam_lfix_create(obj->oo_inode,
3056                                                  feat->dif_keysize_max,
3057                                                  feat->dif_ptrsize,
3058                                                  feat->dif_recsize_max,
3059                                                  oth->ot_handle);
3060
3061         }
3062         return result;
3063 }
3064
3065 static int osd_mkreg(struct osd_thread_info *info, struct osd_object *obj,
3066                      struct lu_attr *attr,
3067                      struct dt_allocation_hint *hint,
3068                      struct dt_object_format *dof,
3069                      struct thandle *th)
3070 {
3071         LASSERT(S_ISREG(attr->la_mode));
3072         return osd_mkfile(info, obj, (attr->la_mode &
3073                                (S_IFMT | S_IALLUGO | S_ISVTX)), hint, th);
3074 }
3075
3076 static int osd_mksym(struct osd_thread_info *info, struct osd_object *obj,
3077                      struct lu_attr *attr,
3078                      struct dt_allocation_hint *hint,
3079                      struct dt_object_format *dof,
3080                      struct thandle *th)
3081 {
3082         LASSERT(S_ISLNK(attr->la_mode));
3083         return osd_mkfile(info, obj, (attr->la_mode &
3084                               (S_IFMT | S_IALLUGO | S_ISVTX)), hint, th);
3085 }
3086
3087 static int osd_mknod(struct osd_thread_info *info, struct osd_object *obj,
3088                      struct lu_attr *attr,
3089                      struct dt_allocation_hint *hint,
3090                      struct dt_object_format *dof,
3091                      struct thandle *th)
3092 {
3093         umode_t mode = attr->la_mode & (S_IFMT | S_IALLUGO | S_ISVTX);
3094         int result;
3095
3096         LINVRNT(osd_invariant(obj));
3097         LASSERT(obj->oo_inode == NULL);
3098         LASSERT(S_ISCHR(mode) || S_ISBLK(mode) ||
3099                 S_ISFIFO(mode) || S_ISSOCK(mode));
3100
3101         result = osd_mkfile(info, obj, mode, hint, th);
3102         if (result == 0) {
3103                 LASSERT(obj->oo_inode != NULL);
3104                 /*
3105                  * This inode should be marked dirty for i_rdev.  Currently
3106                  * that is done in the osd_attr_init().
3107                  */
3108                 init_special_inode(obj->oo_inode, obj->oo_inode->i_mode,
3109                                    attr->la_rdev);
3110         }
3111         LINVRNT(osd_invariant(obj));
3112         return result;
3113 }
3114
3115 typedef int (*osd_obj_type_f)(struct osd_thread_info *, struct osd_object *,
3116                               struct lu_attr *,
3117                               struct dt_allocation_hint *hint,
3118                               struct dt_object_format *dof,
3119                               struct thandle *);
3120
3121 static osd_obj_type_f osd_create_type_f(enum dt_format_type type)
3122 {
3123         osd_obj_type_f result;
3124
3125         switch (type) {
3126         case DFT_DIR:
3127                 result = osd_mkdir;
3128                 break;
3129         case DFT_REGULAR:
3130                 result = osd_mkreg;
3131                 break;
3132         case DFT_SYM:
3133                 result = osd_mksym;
3134                 break;
3135         case DFT_NODE:
3136                 result = osd_mknod;
3137                 break;
3138         case DFT_INDEX:
3139                 result = osd_mk_index;
3140                 break;
3141
3142         default:
3143                 LBUG();
3144                 break;
3145         }
3146         return result;
3147 }
3148
3149
3150 static void osd_ah_init(const struct lu_env *env, struct dt_allocation_hint *ah,
3151                         struct dt_object *parent, struct dt_object *child,
3152                         umode_t child_mode)
3153 {
3154         LASSERT(ah);
3155
3156         ah->dah_parent = parent;
3157         ah->dah_mode = child_mode;
3158
3159         if (parent != NULL && !dt_object_remote(parent)) {
3160                 /* will help to find FID->ino at dt_insert("..") */
3161                 struct osd_object *pobj = osd_dt_obj(parent);
3162                 osd_idc_find_and_init(env, osd_obj2dev(pobj), pobj);
3163         }
3164 }
3165
3166 static void osd_attr_init(struct osd_thread_info *info, struct osd_object *obj,
3167                        &nbs