Whamcloud - gitweb
LU-11796 lov: Remove unnecessary assert
[fs/lustre-release.git] / lustre / lov / lov_object.c
index 0b65bc7..200f943 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2016, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -37,6 +37,8 @@
 
 #define DEBUG_SUBSYSTEM S_LOV
 
+#include <linux/random.h>
+
 #include "lov_cl_internal.h"
 
 static inline struct lov_device *lov_object_dev(struct lov_object *obj)
@@ -211,9 +213,8 @@ static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev,
 
        spin_lock_init(&r0->lo_sub_lock);
        r0->lo_nr = lse->lsme_stripe_count;
-       LASSERT(r0->lo_nr <= lov_targets_nr(dev));
 
-       OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
+       OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof(r0->lo_sub[0]));
        if (r0->lo_sub == NULL)
                GOTO(out, result = -ENOMEM);
 
@@ -280,14 +281,14 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
 {
        struct cl_object        *sub;
        struct lu_site          *site;
-       struct lu_site_bkt_data *bkt;
-       wait_queue_t          *waiter;
+       wait_queue_head_t *wq;
+       wait_queue_entry_t *waiter;
 
         LASSERT(r0->lo_sub[idx] == los);
 
-        sub  = lovsub2cl(los);
-        site = sub->co_lu.lo_dev->ld_site;
-        bkt  = lu_site_bkt_from_fid(site, &sub->co_lu.lo_header->loh_fid);
+       sub = lovsub2cl(los);
+       site = sub->co_lu.lo_dev->ld_site;
+       wq = lu_site_wq_from_fid(site, &sub->co_lu.lo_header->loh_fid);
 
         cl_object_kill(env, sub);
         /* release a reference to the sub-object and ... */
@@ -299,7 +300,7 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
        if (r0->lo_sub[idx] == los) {
                waiter = &lov_env_info(env)->lti_waiter;
                init_waitqueue_entry(waiter, current);
-               add_wait_queue(&bkt->lsb_marche_funebre, waiter);
+               add_wait_queue(wq, waiter);
                set_current_state(TASK_UNINTERRUPTIBLE);
                while (1) {
                        /* this wait-queue is signaled at the end of
@@ -315,7 +316,7 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
                                break;
                        }
                }
-               remove_wait_queue(&bkt->lsb_marche_funebre, waiter);
+               remove_wait_queue(wq, waiter);
        }
        LASSERT(r0->lo_sub[idx] == NULL);
 }
@@ -454,8 +455,8 @@ static int lov_attr_get_dom(const struct lu_env *env, struct lov_object *lov,
         * client's setattr RPC, so do not count anything beyond
         * component end. Alternatively, check that limit on server
         * and do not allow size overflow there. */
-       if (attr->cat_size > lle->lle_extent.e_end)
-               attr->cat_size = lle->lle_extent.e_end;
+       if (attr->cat_size > lle->lle_extent->e_end)
+               attr->cat_size = lle->lle_extent->e_end;
 
        attr->cat_kms = attr->cat_size;
 
@@ -629,10 +630,14 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
 {
        struct lov_layout_composite *comp = &state->composite;
        struct lov_layout_entry *lle;
+       struct lov_mirror_entry *lre;
        unsigned int entry_count;
        unsigned int psz = 0;
+       unsigned int mirror_count;
+       int flr_state = lsm->lsm_flags & LCM_FL_FLR_MASK;
        int result = 0;
-       int i;
+       unsigned int seq;
+       int i, j;
 
        ENTRY;
 
@@ -641,18 +646,36 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
        lov->lo_lsm = lsm_addref(lsm);
        lov->lo_layout_invalid = true;
 
+       dump_lsm(D_INODE, lsm);
+
        entry_count = lsm->lsm_entry_count;
-       comp->lo_entry_count = entry_count;
+
+       spin_lock_init(&comp->lo_write_lock);
+       comp->lo_flags = lsm->lsm_flags;
+       comp->lo_mirror_count = lsm->lsm_mirror_count + 1;
+       comp->lo_entry_count = lsm->lsm_entry_count;
+       comp->lo_preferred_mirror = -1;
+
+       if (equi(flr_state == LCM_FL_NONE, comp->lo_mirror_count > 1))
+               RETURN(-EINVAL);
+
+       OBD_ALLOC(comp->lo_mirrors,
+                 comp->lo_mirror_count * sizeof(*comp->lo_mirrors));
+       if (comp->lo_mirrors == NULL)
+               RETURN(-ENOMEM);
 
        OBD_ALLOC(comp->lo_entries, entry_count * sizeof(*comp->lo_entries));
        if (comp->lo_entries == NULL)
                RETURN(-ENOMEM);
 
        /* Initiate all entry types and extents data at first */
-       for (i = 0; i < entry_count; i++) {
+       for (i = 0, j = 0, mirror_count = 1; i < entry_count; i++) {
+               int mirror_id = 0;
+
                lle = &comp->lo_entries[i];
 
-               lle->lle_type = lov_entry_type(lsm->lsm_entries[i]);
+               lle->lle_lsme = lsm->lsm_entries[i];
+               lle->lle_type = lov_entry_type(lle->lle_lsme);
                switch (lle->lle_type) {
                case LOV_PATTERN_RAID0:
                        lle->lle_comp_ops = &raid0_ops;
@@ -667,30 +690,102 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
                        dump_lsm(D_ERROR, lsm);
                        RETURN(-EIO);
                }
-               lle->lle_extent = lsm->lsm_entries[i]->lsme_extent;
+
+               lle->lle_extent = &lle->lle_lsme->lsme_extent;
+               lle->lle_valid = !(lle->lle_lsme->lsme_flags & LCME_FL_STALE);
+
+               if (flr_state != LCM_FL_NONE)
+                       mirror_id = mirror_id_of(lle->lle_lsme->lsme_id);
+
+               lre = &comp->lo_mirrors[j];
+               if (i > 0) {
+                       if (mirror_id == lre->lre_mirror_id) {
+                               lre->lre_valid |= lle->lle_valid;
+                               lre->lre_stale |= !lle->lle_valid;
+                               lre->lre_end = i;
+                               continue;
+                       }
+
+                       /* new mirror detected, assume that the mirrors
+                        * are shorted in layout */
+                       ++mirror_count;
+                       ++j;
+                       if (j >= comp->lo_mirror_count)
+                               break;
+
+                       lre = &comp->lo_mirrors[j];
+               }
+
+               /* entries must be sorted by mirrors */
+               lre->lre_mirror_id = mirror_id;
+               lre->lre_start = lre->lre_end = i;
+               lre->lre_preferred = !!(lle->lle_lsme->lsme_flags &
+                                       LCME_FL_PREF_RD);
+               lre->lre_valid = lle->lle_valid;
+               lre->lre_stale = !lle->lle_valid;
+       }
+
+       /* sanity check for FLR */
+       if (mirror_count != comp->lo_mirror_count) {
+               CDEBUG(D_INODE, DFID
+                      " doesn't have the # of mirrors it claims, %u/%u\n",
+                      PFID(lu_object_fid(lov2lu(lov))), mirror_count,
+                      comp->lo_mirror_count + 1);
+
+               GOTO(out, result = -EINVAL);
        }
 
-       i = 0;
        lov_foreach_layout_entry(lov, lle) {
+               int index = lov_layout_entry_index(lov, lle);
+
                /**
                 * If the component has not been init-ed on MDS side, for
                 * PFL layout, we'd know that the components beyond this one
                 * will be dynamically init-ed later on file write/trunc ops.
                 */
-               if (lsm_entry_inited(lsm, i)) {
-                       result = lle->lle_comp_ops->lco_init(env, dev, lov, i,
-                                                            conf, lle);
-                       if (result < 0)
-                               break;
+               if (!lsme_inited(lle->lle_lsme))
+                       continue;
 
-                       LASSERT(ergo(psz > 0, psz == result));
-                       psz = result;
-               }
-               i++;
+               result = lle->lle_comp_ops->lco_init(env, dev, lov, index,
+                                                    conf, lle);
+               if (result < 0)
+                       break;
+
+               LASSERT(ergo(psz > 0, psz == result));
+               psz = result;
        }
+
        if (psz > 0)
                cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
 
+       /* decide the preferred mirror. It uses the hash value of lov_object
+        * so that different clients would use different mirrors for read. */
+       mirror_count = 0;
+       seq = hash_long((unsigned long)lov, 8);
+       for (i = 0; i < comp->lo_mirror_count; i++) {
+               unsigned int idx = (i + seq) % comp->lo_mirror_count;
+
+               lre = lov_mirror_entry(lov, idx);
+               if (lre->lre_stale)
+                       continue;
+
+               mirror_count++; /* valid mirror */
+
+               if (lre->lre_preferred || comp->lo_preferred_mirror < 0)
+                       comp->lo_preferred_mirror = idx;
+       }
+       if (!mirror_count) {
+               CDEBUG(D_INODE, DFID
+                      " doesn't have any valid mirrors\n",
+                      PFID(lu_object_fid(lov2lu(lov))));
+
+               comp->lo_preferred_mirror = 0;
+       }
+
+       LASSERT(comp->lo_preferred_mirror >= 0);
+
+       EXIT;
+out:
        return result > 0 ? 0 : result;
 }
 
@@ -768,6 +863,14 @@ static void lov_fini_composite(const struct lu_env *env,
                comp->lo_entries = NULL;
        }
 
+       if (comp->lo_mirrors != NULL) {
+               OBD_FREE(comp->lo_mirrors,
+                        comp->lo_mirror_count * sizeof(*comp->lo_mirrors));
+               comp->lo_mirrors = NULL;
+       }
+
+       memset(comp, 0, sizeof(*comp));
+
        dump_lsm(D_INODE, lov->lo_lsm);
        lov_free_memmd(&lov->lo_lsm);
 
@@ -854,7 +957,6 @@ static int lov_attr_get_composite(const struct lu_env *env,
        struct lov_object       *lov = cl2lov(obj);
        struct lov_layout_entry *entry;
        int                      result = 0;
-       int                      index = 0;
 
        ENTRY;
 
@@ -862,18 +964,20 @@ static int lov_attr_get_composite(const struct lu_env *env,
        attr->cat_blocks = 0;
        lov_foreach_layout_entry(lov, entry) {
                struct cl_attr *lov_attr = NULL;
+               int index = lov_layout_entry_index(lov, entry);
+
+               if (!entry->lle_valid)
+                       continue;
 
                /* PFL: This component has not been init-ed. */
                if (!lsm_entry_inited(lov->lo_lsm, index))
-                       break;
+                       continue;
 
                result = entry->lle_comp_ops->lco_getattr(env, lov, index,
                                                          entry, &lov_attr);
                if (result < 0)
                        RETURN(result);
 
-               index++;
-
                if (lov_attr == NULL)
                        continue;
 
@@ -895,6 +999,7 @@ static int lov_attr_get_composite(const struct lu_env *env,
                if (attr->cat_mtime < lov_attr->cat_mtime)
                        attr->cat_mtime = lov_attr->cat_mtime;
        }
+
        RETURN(0);
 }
 
@@ -1089,12 +1194,11 @@ static int lov_layout_change(const struct lu_env *unused,
        CDEBUG(D_INODE, DFID "Apply new layout lov %p, type %d\n",
               PFID(lu_object_fid(lov2lu(lov))), lov, llt);
 
-       lov->lo_type = LLT_EMPTY;
-
        /* page bufsize fixup */
        cl_object_header(&lov->lo_cl)->coh_page_bufsize -=
                lov_page_slice_fixup(lov, NULL);
 
+       lov->lo_type = llt;
        rc = new_ops->llo_init(env, lov_dev, lov, lsm, conf, state);
        if (rc != 0) {
                struct obd_device *obd = lov2obd(lov_dev->ld_lov);
@@ -1104,11 +1208,10 @@ static int lov_layout_change(const struct lu_env *unused,
                new_ops->llo_delete(env, lov, state);
                new_ops->llo_fini(env, lov, state);
                /* this file becomes an EMPTY file. */
+               lov->lo_type = LLT_EMPTY;
                GOTO(out, rc);
        }
 
-       lov->lo_type = llt;
-
 out:
        cl_env_put(env, &refcheck);
        RETURN(rc);
@@ -1264,14 +1367,19 @@ int lov_page_init(const struct lu_env *env, struct cl_object *obj,
 int lov_io_init(const struct lu_env *env, struct cl_object *obj,
                struct cl_io *io)
 {
-       CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
+       CL_IO_SLICE_CLEAN(lov_env_io(env), lis_preserved);
 
        CDEBUG(D_INODE, DFID "io %p type %d ignore/verify layout %d/%d\n",
               PFID(lu_object_fid(&obj->co_lu)), io, io->ci_type,
               io->ci_ignore_layout, io->ci_verify_layout);
 
+       /* IO type CIT_MISC with ci_ignore_layout set are usually invoked from
+        * the OSC layer. It shouldn't take lov layout conf lock in that case,
+        * because as long as the OSC object exists, the layout can't be
+        * reconfigured. */
        return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init,
-                                    !io->ci_ignore_layout, env, obj, io);
+                       !(io->ci_ignore_layout && io->ci_type == CIT_MISC),
+                       env, obj, io);
 }
 
 /**
@@ -1696,8 +1804,11 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
        ENTRY;
 
        lsm = lov_lsm_addref(cl2lov(obj));
-       if (lsm == NULL)
-               RETURN(-ENODATA);
+       if (lsm == NULL) {
+               /* no extent: there is no object for mapping */
+               fiemap->fm_mapped_extents = 0;
+               return 0;
+       }
 
        if (!(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER)) {
                /**
@@ -1781,6 +1892,7 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
        if (start_entry == -1 || end_entry == -1)
                GOTO(out_fm_local, rc = -EINVAL);
 
+       /* TODO: rewrite it with lov_foreach_io_layout() */
        for (entry = start_entry; entry <= end_entry; entry++) {
                lsme = lsm->lsm_entries[entry];
 
@@ -1886,6 +1998,7 @@ static int lov_object_layout_get(const struct lu_env *env,
 
        cl->cl_size = lov_comp_md_size(lsm);
        cl->cl_layout_gen = lsm->lsm_layout_gen;
+       cl->cl_dom_comp_size = 0;
        if (lsm_is_composite(lsm->lsm_magic)) {
                struct lov_stripe_md_entry *lsme = lsm->lsm_entries[0];
 
@@ -1893,7 +2006,10 @@ static int lov_object_layout_get(const struct lu_env *env,
 
                if (lsme_is_dom(lsme))
                        cl->cl_dom_comp_size = lsme->lsme_extent.e_end;
+       } else {
+               cl->cl_is_composite = false;
        }
+
        rc = lov_lsm_pack(lsm, buf->lb_buf, buf->lb_len);
        lov_lsm_put(lsm);