Whamcloud - gitweb
LU-7813 lov: rename LOV_MAGIC_V*_DEF to *_DEFINED
[fs/lustre-release.git] / lustre / osd-ldiskfs / osd_oi.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2016, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/osd/osd_oi.c
33  *
34  * Object Index.
35  *
36  * Author: Nikita Danilov <nikita@clusterfs.com>
37  */
38
39 #define DEBUG_SUBSYSTEM S_OSD
40
41 #include <linux/module.h>
42
43 /*
44  * struct OBD_{ALLOC,FREE}*()
45  * OBD_FAIL_CHECK
46  */
47 #include <obd.h>
48 #include <obd_support.h>
49
50 /* fid_cpu_to_be() */
51 #include <lustre_fid.h>
52 #include <dt_object.h>
53
54 #include "osd_oi.h"
55 /* osd_lookup(), struct osd_thread_info */
56 #include "osd_internal.h"
57 #include "osd_scrub.h"
58
59 static unsigned int osd_oi_count = OSD_OI_FID_NR;
60 module_param(osd_oi_count, int, 0444);
61 MODULE_PARM_DESC(osd_oi_count, "Number of Object Index containers to be created, it's only valid for new filesystem.");
62
63 /** to serialize concurrent OI index initialization */
64 static struct mutex oi_init_lock;
65
66 static struct dt_index_features oi_feat = {
67         .dif_flags       = DT_IND_UPDATE,
68         .dif_recsize_min = sizeof(struct osd_inode_id),
69         .dif_recsize_max = sizeof(struct osd_inode_id),
70         .dif_ptrsize     = 4
71 };
72
73 #define OSD_OI_NAME_BASE        "oi.16"
74
75 static void osd_oi_table_put(struct osd_thread_info *info,
76                              struct osd_oi **oi_table, unsigned oi_count)
77 {
78         struct iam_container *bag;
79         int                   i;
80
81         for (i = 0; i < oi_count; i++) {
82                 if (oi_table[i] == NULL)
83                         continue;
84
85                 LASSERT(oi_table[i]->oi_inode != NULL);
86
87                 bag = &(oi_table[i]->oi_dir.od_container);
88                 if (bag->ic_object == oi_table[i]->oi_inode)
89                         iam_container_fini(bag);
90                 iput(oi_table[i]->oi_inode);
91                 oi_table[i]->oi_inode = NULL;
92                 OBD_FREE_PTR(oi_table[i]);
93                 oi_table[i] = NULL;
94         }
95 }
96
97 static int osd_oi_index_create_one(struct osd_thread_info *info,
98                                    struct osd_device *osd, const char *name,
99                                    struct dt_index_features *feat)
100 {
101         const struct lu_env             *env = info->oti_env;
102         struct osd_inode_id             *id  = &info->oti_id;
103         struct buffer_head              *bh;
104         struct inode                    *inode;
105         struct ldiskfs_dir_entry_2      *de;
106         struct dentry                   *dentry;
107         struct super_block              *sb  = osd_sb(osd);
108         struct inode                    *dir = sb->s_root->d_inode;
109         handle_t                        *jh;
110         int                              rc;
111
112         dentry = osd_child_dentry_by_inode(env, dir, name, strlen(name));
113         bh = osd_ldiskfs_find_entry(dir, &dentry->d_name, &de, NULL, NULL);
114         if (!IS_ERR(bh)) {
115                 osd_id_gen(id, le32_to_cpu(de->inode), OSD_OII_NOGEN);
116                 brelse(bh);
117                 inode = osd_iget(info, osd, id);
118                 if (!IS_ERR(inode)) {
119                         iput(inode);
120                         inode = ERR_PTR(-EEXIST);
121                 }
122                 return PTR_ERR(inode);
123         }
124
125         jh = osd_journal_start_sb(sb, LDISKFS_HT_MISC, 100);
126         if (IS_ERR(jh))
127                 return PTR_ERR(jh);
128
129         inode = ldiskfs_create_inode(jh, dir, (S_IFREG | S_IRUGO | S_IWUSR));
130         if (IS_ERR(inode)) {
131                 ldiskfs_journal_stop(jh);
132                 return PTR_ERR(inode);
133         }
134
135         ldiskfs_set_inode_state(inode, LDISKFS_STATE_LUSTRE_NOSCRUB);
136         unlock_new_inode(inode);
137
138         if (feat->dif_flags & DT_IND_VARKEY)
139                 rc = iam_lvar_create(inode, feat->dif_keysize_max,
140                                      feat->dif_ptrsize, feat->dif_recsize_max,
141                                      jh);
142         else
143                 rc = iam_lfix_create(inode, feat->dif_keysize_max,
144                                      feat->dif_ptrsize, feat->dif_recsize_max,
145                                      jh);
146         dentry = osd_child_dentry_by_inode(env, dir, name, strlen(name));
147         rc = osd_ldiskfs_add_entry(info, osd, jh, dentry, inode, NULL);
148         ldiskfs_journal_stop(jh);
149         iput(inode);
150         return rc;
151 }
152
153 static struct inode *osd_oi_index_open(struct osd_thread_info *info,
154                                        struct osd_device *osd,
155                                        const char *name,
156                                        struct dt_index_features *f,
157                                        bool create)
158 {
159         struct dentry *dentry;
160         struct inode  *inode;
161         int            rc;
162
163         dentry = ll_lookup_one_len(name, osd_sb(osd)->s_root, strlen(name));
164         if (IS_ERR(dentry))
165                 return (void *) dentry;
166
167         if (dentry->d_inode) {
168                 LASSERT(!is_bad_inode(dentry->d_inode));
169                 inode = dentry->d_inode;
170                 atomic_inc(&inode->i_count);
171                 dput(dentry);
172                 return inode;
173         }
174
175         /* create */
176         dput(dentry);
177         shrink_dcache_parent(osd_sb(osd)->s_root);
178         if (!create)
179                 return ERR_PTR(-ENOENT);
180
181         rc = osd_oi_index_create_one(info, osd, name, f);
182         if (rc)
183                 return ERR_PTR(rc);
184
185         dentry = ll_lookup_one_len(name, osd_sb(osd)->s_root, strlen(name));
186         if (IS_ERR(dentry))
187                 return (void *) dentry;
188
189         if (dentry->d_inode) {
190                 LASSERT(!is_bad_inode(dentry->d_inode));
191                 inode = dentry->d_inode;
192                 atomic_inc(&inode->i_count);
193                 dput(dentry);
194                 return inode;
195         }
196
197         return ERR_PTR(-ENOENT);
198 }
199
200 /**
201  * Open an OI(Ojbect Index) container.
202  *
203  * \param       name    Name of OI container
204  * \param       objp    Pointer of returned OI
205  *
206  * \retval      0       success
207  * \retval      -ve     failure
208  */
209 static int osd_oi_open(struct osd_thread_info *info, struct osd_device *osd,
210                        char *name, struct osd_oi **oi_slot, bool create)
211 {
212         struct osd_directory *dir;
213         struct iam_container *bag;
214         struct inode         *inode;
215         struct osd_oi        *oi;
216         int                   rc;
217
218         ENTRY;
219
220         oi_feat.dif_keysize_min = sizeof(struct lu_fid);
221         oi_feat.dif_keysize_max = sizeof(struct lu_fid);
222
223         inode = osd_oi_index_open(info, osd, name, &oi_feat, create);
224         if (IS_ERR(inode))
225                 RETURN(PTR_ERR(inode));
226
227         /* 'What the @fid is' is not imporatant, because these objects
228          * have no OI mappings, and only are visible inside the OSD.*/
229         lu_igif_build(&info->oti_fid, inode->i_ino, inode->i_generation);
230         rc = osd_ea_fid_set(info, inode, &info->oti_fid, LMAC_NOT_IN_OI, 0);
231         if (rc != 0)
232                 GOTO(out_inode, rc);
233
234         OBD_ALLOC_PTR(oi);
235         if (oi == NULL)
236                 GOTO(out_inode, rc = -ENOMEM);
237
238         oi->oi_inode = inode;
239         dir = &oi->oi_dir;
240
241         bag = &dir->od_container;
242         rc = iam_container_init(bag, &dir->od_descr, inode);
243         if (rc < 0)
244                 GOTO(out_free, rc);
245
246         rc = iam_container_setup(bag);
247         if (rc < 0)
248                 GOTO(out_container, rc);
249
250         *oi_slot = oi;
251         RETURN(0);
252
253 out_container:
254         iam_container_fini(bag);
255 out_free:
256         OBD_FREE_PTR(oi);
257 out_inode:
258         iput(inode);
259         return rc;
260 }
261
262 /**
263  * Open OI(Object Index) table.
264  * If \a oi_count is zero, which means caller doesn't know how many OIs there
265  * will be, this function can either return 0 for new filesystem, or number
266  * of OIs on existed filesystem.
267  *
268  * If \a oi_count is non-zero, which means caller does know number of OIs on
269  * filesystem, this function should return the exactly same number on
270  * success, or error code in failure.
271  *
272  * \param     oi_count  Number of expected OI containers
273  * \param     create    Create OIs if doesn't exist
274  *
275  * \retval    +ve       number of opened OI containers
276  * \retval      0       no OI containers found
277  * \retval    -ve       failure
278  */
279 static int
280 osd_oi_table_open(struct osd_thread_info *info, struct osd_device *osd,
281                   struct osd_oi **oi_table, unsigned oi_count, bool create)
282 {
283         struct scrub_file *sf = &osd->od_scrub.os_file;
284         int                count = 0;
285         int                rc = 0;
286         int                i;
287         ENTRY;
288
289         /* NB: oi_count != 0 means that we have already created/known all OIs
290          * and have known exact number of OIs. */
291         LASSERT(oi_count <= OSD_OI_FID_NR_MAX);
292
293         for (i = 0; i < (oi_count != 0 ? oi_count : OSD_OI_FID_NR_MAX); i++) {
294                 char name[12];
295
296                 if (oi_table[i] != NULL) {
297                         count++;
298                         continue;
299                 }
300
301                 sprintf(name, "%s.%d", OSD_OI_NAME_BASE, i);
302                 rc = osd_oi_open(info, osd, name, &oi_table[i], create);
303                 if (rc == 0) {
304                         count++;
305                         continue;
306                 }
307
308                 if (rc == -ENOENT && create == false) {
309                         if (oi_count == 0)
310                                 return count;
311
312                         rc = 0;
313                         ldiskfs_set_bit(i, sf->sf_oi_bitmap);
314                         continue;
315                 }
316
317                 CERROR("%s: can't open %s: rc = %d\n",
318                        osd_dev2name(osd), name, rc);
319                 if (oi_count > 0)
320                         CERROR("%s: expect to open total %d OI files.\n",
321                                osd_dev2name(osd), oi_count);
322                 break;
323         }
324
325         if (rc < 0) {
326                 osd_oi_table_put(info, oi_table, oi_count > 0 ? oi_count : i);
327                 count = rc;
328         }
329
330         RETURN(count);
331 }
332
333 static int osd_remove_oi_one(struct dentry *parent, const char *name,
334                              int namelen)
335 {
336         struct dentry *child;
337         int rc;
338
339         child = ll_lookup_one_len(name, parent, namelen);
340         if (IS_ERR(child)) {
341                 rc = PTR_ERR(child);
342         } else {
343                 rc = ll_vfs_unlink(parent->d_inode, child);
344                 dput(child);
345         }
346
347         return rc == -ENOENT ? 0 : rc;
348 }
349
350 static int osd_remove_ois(struct osd_thread_info *info, struct osd_device *osd)
351 {
352         char name[16];
353         int namelen;
354         int rc;
355         int i;
356
357         for (i = 0; i < osd->od_scrub.os_file.sf_oi_count; i++) {
358                 namelen = snprintf(name, sizeof(name), "%s.%d",
359                                    OSD_OI_NAME_BASE, i);
360                 rc = osd_remove_oi_one(osd_sb(osd)->s_root, name, namelen);
361                 if (rc != 0) {
362                         CERROR("%s: fail to remove the stale OI file %s: "
363                                "rc = %d\n", osd_dev2name(osd), name, rc);
364                         return rc;
365                 }
366         }
367
368         namelen = snprintf(name, sizeof(name), "%s", OSD_OI_NAME_BASE);
369         rc = osd_remove_oi_one(osd_sb(osd)->s_root, name, namelen);
370         if (rc != 0)
371                 CERROR("%s: fail to remove the stale OI file %s: rc = %d\n",
372                        osd_dev2name(osd), name, rc);
373
374         return rc;
375 }
376
377 int osd_oi_init(struct osd_thread_info *info, struct osd_device *osd,
378                 bool restored)
379 {
380         struct osd_scrub  *scrub = &osd->od_scrub;
381         struct scrub_file *sf = &scrub->os_file;
382         struct osd_oi    **oi;
383         int                rc;
384         ENTRY;
385
386         if (restored) {
387                 rc = osd_remove_ois(info, osd);
388                 if (rc != 0)
389                         return rc;
390         }
391
392         OBD_ALLOC(oi, sizeof(*oi) * OSD_OI_FID_NR_MAX);
393         if (oi == NULL)
394                 RETURN(-ENOMEM);
395
396         mutex_lock(&oi_init_lock);
397         /* try to open existing multiple OIs first */
398         rc = osd_oi_table_open(info, osd, oi, sf->sf_oi_count, false);
399         if (rc < 0)
400                 GOTO(out, rc);
401
402         if (rc > 0) {
403                 if (rc == sf->sf_oi_count || sf->sf_oi_count == 0)
404                         GOTO(out, rc);
405
406                 osd_scrub_file_reset(scrub,
407                                      LDISKFS_SB(osd_sb(osd))->s_es->s_uuid,
408                                      SF_RECREATED);
409                 osd_oi_count = sf->sf_oi_count;
410                 goto create;
411         }
412
413         /* if previous failed then try found single OI from old filesystem */
414         rc = osd_oi_open(info, osd, OSD_OI_NAME_BASE, &oi[0], false);
415         if (rc == 0) { /* found single OI from old filesystem */
416                 ldiskfs_clear_bit(0, sf->sf_oi_bitmap);
417                 if (sf->sf_success_count == 0)
418                         /* XXX: There is one corner case that if the OI_scrub
419                          *      file crashed or lost and we regard it upgrade,
420                          *      then we allow IGIF lookup to bypass OI files.
421                          *
422                          *      The risk is that osd_fid_lookup() may found
423                          *      a wrong inode with the given IGIF especially
424                          *      when the MDT has performed file-level backup
425                          *      and restored after former upgrading from 1.8
426                          *      to 2.x. Fortunately, the osd_fid_lookup()can
427                          *      verify the inode to decrease the risk. */
428                         osd_scrub_file_reset(scrub,
429                                         LDISKFS_SB(osd_sb(osd))->s_es->s_uuid,
430                                         SF_UPGRADE);
431                 GOTO(out, rc = 1);
432         } else if (rc != -ENOENT) {
433                 CERROR("%s: can't open %s: rc = %d\n",
434                        osd_dev2name(osd), OSD_OI_NAME_BASE, rc);
435                 GOTO(out, rc);
436         }
437
438         if (sf->sf_oi_count > 0) {
439                 int i;
440
441                 memset(sf->sf_oi_bitmap, 0, SCRUB_OI_BITMAP_SIZE);
442                 for (i = 0; i < osd_oi_count; i++)
443                         ldiskfs_set_bit(i, sf->sf_oi_bitmap);
444                 osd_scrub_file_reset(scrub,
445                                      LDISKFS_SB(osd_sb(osd))->s_es->s_uuid,
446                                      SF_RECREATED);
447         }
448         sf->sf_oi_count = osd_oi_count;
449
450 create:
451         rc = osd_scrub_file_store(scrub);
452         if (rc < 0) {
453                 osd_oi_table_put(info, oi, sf->sf_oi_count);
454                 GOTO(out, rc);
455         }
456
457         /* No OIs exist, new filesystem, create OI objects */
458         rc = osd_oi_table_open(info, osd, oi, osd_oi_count, true);
459         LASSERT(ergo(rc >= 0, rc == osd_oi_count));
460
461         GOTO(out, rc);
462
463 out:
464         if (rc < 0) {
465                 OBD_FREE(oi, sizeof(*oi) * OSD_OI_FID_NR_MAX);
466         } else {
467                 LASSERT((rc & (rc - 1)) == 0);
468                 osd->od_oi_table = oi;
469                 osd->od_oi_count = rc;
470                 if (sf->sf_oi_count != rc) {
471                         sf->sf_oi_count = rc;
472                         rc = osd_scrub_file_store(scrub);
473                         if (rc < 0) {
474                                 osd_oi_table_put(info, oi, sf->sf_oi_count);
475                                 OBD_FREE(oi, sizeof(*oi) * OSD_OI_FID_NR_MAX);
476                         }
477                 } else {
478                         rc = 0;
479                 }
480         }
481
482         mutex_unlock(&oi_init_lock);
483         return rc;
484 }
485
486 void osd_oi_fini(struct osd_thread_info *info, struct osd_device *osd)
487 {
488         if (unlikely(osd->od_oi_table == NULL))
489                 return;
490
491         osd_oi_table_put(info, osd->od_oi_table, osd->od_oi_count);
492
493         OBD_FREE(osd->od_oi_table,
494                  sizeof(*(osd->od_oi_table)) * OSD_OI_FID_NR_MAX);
495         osd->od_oi_table = NULL;
496 }
497
498 static inline int fid_is_fs_root(const struct lu_fid *fid)
499 {
500         /* Map root inode to special local object FID */
501         return (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE &&
502                          fid_oid(fid) == OSD_FS_ROOT_OID));
503 }
504
505 static int osd_oi_iam_lookup(struct osd_thread_info *oti,
506                              struct osd_oi *oi, struct dt_rec *rec,
507                              const struct dt_key *key)
508 {
509         struct iam_container  *bag;
510         struct iam_iterator   *it = &oti->oti_idx_it;
511         struct iam_path_descr *ipd;
512         int                    rc;
513         ENTRY;
514
515         LASSERT(oi);
516         LASSERT(oi->oi_inode);
517
518         bag = &oi->oi_dir.od_container;
519         ipd = osd_idx_ipd_get(oti->oti_env, bag);
520         if (IS_ERR(ipd))
521                 RETURN(-ENOMEM);
522
523         /* got ipd now we can start iterator. */
524         iam_it_init(it, bag, 0, ipd);
525
526         rc = iam_it_get(it, (struct iam_key *)key);
527         if (rc > 0)
528                 iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)rec);
529         iam_it_put(it);
530         iam_it_fini(it);
531         osd_ipd_put(oti->oti_env, bag, ipd);
532
533         LINVRNT(osd_invariant(obj));
534
535         RETURN(rc);
536 }
537
538 int fid_is_on_ost(struct osd_thread_info *info, struct osd_device *osd,
539                   const struct lu_fid *fid, enum oi_check_flags flags)
540 {
541         struct lu_seq_range     *range = &info->oti_seq_range;
542         int                     rc;
543         ENTRY;
544
545         if (flags & OI_KNOWN_ON_OST)
546                 RETURN(1);
547
548         if (unlikely(fid_is_local_file(fid) || fid_is_igif(fid) ||
549                      fid_is_llog(fid)) || fid_is_name_llog(fid) ||
550                      fid_is_quota(fid))
551                 RETURN(0);
552
553         if (fid_is_idif(fid) || fid_is_last_id(fid))
554                 RETURN(1);
555
556         if (!(flags & OI_CHECK_FLD))
557                 RETURN(0);
558
559         if (osd_seq_site(osd)->ss_server_fld == NULL)
560                 RETURN(0);
561
562         rc = osd_fld_lookup(info->oti_env, osd, fid_seq(fid), range);
563         if (rc != 0) {
564                 /* During upgrade, OST FLDB might not be loaded because
565                  * OST FLDB is not created until 2.6, so if some DNE
566                  * filesystem upgrade from 2.5 to 2.7/2.8, they will
567                  * not be able to find the sequence from local FLDB
568                  * cache see fld_index_init(). */
569                 if (rc == -ENOENT && osd->od_is_ost)
570                         RETURN(1);
571
572                 if (rc != -ENOENT)
573                         CERROR("%s: lookup FLD "DFID": rc = %d\n",
574                                osd_name(osd), PFID(fid), rc);
575                 RETURN(0);
576         }
577
578         if (fld_range_is_ost(range))
579                 RETURN(1);
580
581         RETURN(0);
582 }
583
584 static int __osd_oi_lookup(struct osd_thread_info *info, struct osd_device *osd,
585                            const struct lu_fid *fid, struct osd_inode_id *id)
586 {
587         struct lu_fid *oi_fid = &info->oti_fid2;
588         int            rc;
589
590         fid_cpu_to_be(oi_fid, fid);
591         rc = osd_oi_iam_lookup(info, osd_fid2oi(osd, fid), (struct dt_rec *)id,
592                                (const struct dt_key *)oi_fid);
593         if (rc > 0) {
594                 osd_id_unpack(id, id);
595                 rc = 0;
596         } else if (rc == 0) {
597                 rc = -ENOENT;
598         }
599         return rc;
600 }
601
602 int osd_oi_lookup(struct osd_thread_info *info, struct osd_device *osd,
603                   const struct lu_fid *fid, struct osd_inode_id *id,
604                   enum oi_check_flags flags)
605 {
606         if (unlikely(fid_is_last_id(fid)))
607                 return osd_obj_spec_lookup(info, osd, fid, id);
608
609         if (fid_is_on_ost(info, osd, fid, flags) || fid_is_llog(fid))
610                 return osd_obj_map_lookup(info, osd, fid, id);
611
612
613         if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE)) {
614                 int rc;
615                 if (fid_is_fs_root(fid)) {
616                         osd_id_gen(id, osd_sb(osd)->s_root->d_inode->i_ino,
617                                    osd_sb(osd)->s_root->d_inode->i_generation);
618                         return 0;
619                 }
620                 if (unlikely(fid_is_acct(fid)))
621                         return osd_acct_obj_lookup(info, osd, fid, id);
622
623                 /* For other special FIDs, try OI first, then do spec lookup */
624                 rc = __osd_oi_lookup(info, osd, fid, id);
625                 if (rc == -ENOENT)
626                         return osd_obj_spec_lookup(info, osd, fid, id);
627                 return rc;
628         }
629
630         if (!osd->od_igif_inoi && fid_is_igif(fid)) {
631                 osd_id_gen(id, lu_igif_ino(fid), lu_igif_gen(fid));
632                 return 0;
633         }
634
635         return __osd_oi_lookup(info, osd, fid, id);
636 }
637
638 static int osd_oi_iam_refresh(struct osd_thread_info *oti, struct osd_oi *oi,
639                              const struct dt_rec *rec, const struct dt_key *key,
640                              handle_t *th, bool insert)
641 {
642         struct iam_container    *bag;
643         struct iam_path_descr   *ipd;
644         int                     rc;
645         ENTRY;
646
647         LASSERT(oi);
648         LASSERT(oi->oi_inode);
649         ll_vfs_dq_init(oi->oi_inode);
650
651         bag = &oi->oi_dir.od_container;
652         ipd = osd_idx_ipd_get(oti->oti_env, bag);
653         if (unlikely(ipd == NULL))
654                 RETURN(-ENOMEM);
655
656         LASSERT(th != NULL);
657         LASSERT(th->h_transaction != NULL);
658         if (insert)
659                 rc = iam_insert(th, bag, (const struct iam_key *)key,
660                                 (const struct iam_rec *)rec, ipd);
661         else
662                 rc = iam_update(th, bag, (const struct iam_key *)key,
663                                 (const struct iam_rec *)rec, ipd);
664         osd_ipd_put(oti->oti_env, bag, ipd);
665         LINVRNT(osd_invariant(obj));
666         RETURN(rc);
667 }
668
669 int osd_oi_insert(struct osd_thread_info *info, struct osd_device *osd,
670                   const struct lu_fid *fid, const struct osd_inode_id *id,
671                   handle_t *th, enum oi_check_flags flags, bool *exist)
672 {
673         struct lu_fid       *oi_fid = &info->oti_fid2;
674         struct osd_inode_id *oi_id  = &info->oti_id2;
675         int                  rc     = 0;
676
677         if (unlikely(fid_is_last_id(fid)))
678                 return osd_obj_spec_insert(info, osd, fid, id, th);
679
680         if (fid_is_on_ost(info, osd, fid, flags) || fid_is_llog(fid))
681                 return osd_obj_map_insert(info, osd, fid, id, th);
682
683         fid_cpu_to_be(oi_fid, fid);
684         osd_id_pack(oi_id, id);
685         rc = osd_oi_iam_refresh(info, osd_fid2oi(osd, fid),
686                                (const struct dt_rec *)oi_id,
687                                (const struct dt_key *)oi_fid, th, true);
688         if (rc != 0) {
689                 struct inode *inode;
690                 struct lustre_mdt_attrs *lma = &info->oti_ost_attrs.loa_lma;
691
692                 if (rc != -EEXIST)
693                         return rc;
694
695                 rc = osd_oi_lookup(info, osd, fid, oi_id, 0);
696                 if (rc != 0)
697                         return rc;
698
699                 if (unlikely(osd_id_eq(id, oi_id)))
700                         return 1;
701
702                 /* Check whether the mapping for oi_id is valid or not. */
703                 inode = osd_iget(info, osd, oi_id);
704                 if (IS_ERR(inode)) {
705                         rc = PTR_ERR(inode);
706                         if (rc == -ENOENT || rc == -ESTALE)
707                                 goto update;
708                         return rc;
709                 }
710
711                 /* The EA inode should NOT be in OI, old OI scrub may added
712                  * such OI mapping by wrong, replace it. */
713                 if (unlikely(osd_is_ea_inode(inode))) {
714                         iput(inode);
715                         goto update;
716                 }
717
718                 rc = osd_get_lma(info, inode, &info->oti_obj_dentry,
719                                  &info->oti_ost_attrs);
720                 iput(inode);
721                 if (rc == -ENODATA)
722                         goto update;
723
724                 if (rc != 0)
725                         return rc;
726
727                 if (!(lma->lma_compat & LMAC_NOT_IN_OI) &&
728                     lu_fid_eq(fid, &lma->lma_self_fid)) {
729                         CERROR("%s: the FID "DFID" is used by two objects: "
730                                "%u/%u %u/%u\n", osd_dev2name(osd),
731                                PFID(fid), oi_id->oii_ino, oi_id->oii_gen,
732                                id->oii_ino, id->oii_gen);
733                         return -EEXIST;
734                 }
735
736 update:
737                 osd_id_pack(oi_id, id);
738                 rc = osd_oi_iam_refresh(info, osd_fid2oi(osd, fid),
739                                         (const struct dt_rec *)oi_id,
740                                         (const struct dt_key *)oi_fid, th, false);
741                 if (rc != 0)
742                         return rc;
743
744                 if (exist != NULL)
745                         *exist = true;
746         }
747
748         if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE))
749                 rc = osd_obj_spec_insert(info, osd, fid, id, th);
750         return rc;
751 }
752
753 static int osd_oi_iam_delete(struct osd_thread_info *oti, struct osd_oi *oi,
754                              const struct dt_key *key, handle_t *th)
755 {
756         struct iam_container    *bag;
757         struct iam_path_descr   *ipd;
758         int                      rc;
759         ENTRY;
760
761         LASSERT(oi);
762         LASSERT(oi->oi_inode);
763         ll_vfs_dq_init(oi->oi_inode);
764
765         bag = &oi->oi_dir.od_container;
766         ipd = osd_idx_ipd_get(oti->oti_env, bag);
767         if (unlikely(ipd == NULL))
768                 RETURN(-ENOMEM);
769
770         LASSERT(th != NULL);
771         LASSERT(th->h_transaction != NULL);
772
773         rc = iam_delete(th, bag, (const struct iam_key *)key, ipd);
774         osd_ipd_put(oti->oti_env, bag, ipd);
775         LINVRNT(osd_invariant(obj));
776         RETURN(rc);
777 }
778
779 int osd_oi_delete(struct osd_thread_info *info,
780                   struct osd_device *osd, const struct lu_fid *fid,
781                   handle_t *th, enum oi_check_flags flags)
782 {
783         struct lu_fid *oi_fid = &info->oti_fid2;
784
785         /* clear idmap cache */
786         if (lu_fid_eq(fid, &info->oti_cache.oic_fid))
787                 fid_zero(&info->oti_cache.oic_fid);
788
789         if (fid_is_last_id(fid))
790                 return 0;
791
792         if (fid_is_on_ost(info, osd, fid, flags) || fid_is_llog(fid))
793                 return osd_obj_map_delete(info, osd, fid, th);
794
795         fid_cpu_to_be(oi_fid, fid);
796         return osd_oi_iam_delete(info, osd_fid2oi(osd, fid),
797                                  (const struct dt_key *)oi_fid, th);
798 }
799
800 int osd_oi_update(struct osd_thread_info *info, struct osd_device *osd,
801                   const struct lu_fid *fid, const struct osd_inode_id *id,
802                   handle_t *th, enum oi_check_flags flags)
803 {
804         struct lu_fid       *oi_fid = &info->oti_fid2;
805         struct osd_inode_id *oi_id  = &info->oti_id2;
806         int                  rc     = 0;
807
808         if (unlikely(fid_is_last_id(fid)))
809                 return osd_obj_spec_update(info, osd, fid, id, th);
810
811         if (fid_is_on_ost(info, osd, fid, flags) || fid_is_llog(fid))
812                 return osd_obj_map_update(info, osd, fid, id, th);
813
814         fid_cpu_to_be(oi_fid, fid);
815         osd_id_pack(oi_id, id);
816         rc = osd_oi_iam_refresh(info, osd_fid2oi(osd, fid),
817                                (const struct dt_rec *)oi_id,
818                                (const struct dt_key *)oi_fid, th, false);
819         if (rc != 0)
820                 return rc;
821
822         if (unlikely(fid_seq(fid) == FID_SEQ_LOCAL_FILE))
823                 rc = osd_obj_spec_update(info, osd, fid, id, th);
824         return rc;
825 }
826
827 int osd_oi_mod_init(void)
828 {
829         if (osd_oi_count == 0 || osd_oi_count > OSD_OI_FID_NR_MAX)
830                 osd_oi_count = OSD_OI_FID_NR;
831
832         if ((osd_oi_count & (osd_oi_count - 1)) != 0) {
833                 LCONSOLE_WARN("Round up oi_count %d to power2 %d\n",
834                               osd_oi_count, size_roundup_power2(osd_oi_count));
835                 osd_oi_count = size_roundup_power2(osd_oi_count);
836         }
837
838         mutex_init(&oi_init_lock);
839         return 0;
840 }