Whamcloud - gitweb
LU-12296 llite: improve ll_dom_lock_cancel
[fs/lustre-release.git] / lustre / lov / lov_object.c
index ac0493e..cc041c0 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2016, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -37,6 +37,8 @@
 
 #define DEBUG_SUBSYSTEM S_LOV
 
+#include <linux/random.h>
+
 #include "lov_cl_internal.h"
 
 static inline struct lov_device *lov_object_dev(struct lov_object *obj)
@@ -74,6 +76,8 @@ struct lov_layout_operations {
                             struct cl_object *obj, struct cl_io *io);
         int  (*llo_getattr)(const struct lu_env *env, struct cl_object *obj,
                             struct cl_attr *attr);
+       int  (*llo_flush)(const struct lu_env *env, struct cl_object *obj,
+                         struct ldlm_lock *lock);
 };
 
 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov);
@@ -211,9 +215,8 @@ static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev,
 
        spin_lock_init(&r0->lo_sub_lock);
        r0->lo_nr = lse->lsme_stripe_count;
-       LASSERT(r0->lo_nr <= lov_targets_nr(dev));
 
-       OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
+       OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof(r0->lo_sub[0]));
        if (r0->lo_sub == NULL)
                GOTO(out, result = -ENOMEM);
 
@@ -280,14 +283,14 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
 {
        struct cl_object        *sub;
        struct lu_site          *site;
-       struct lu_site_bkt_data *bkt;
-       wait_queue_t          *waiter;
+       wait_queue_head_t *wq;
+       wait_queue_entry_t *waiter;
 
         LASSERT(r0->lo_sub[idx] == los);
 
-        sub  = lovsub2cl(los);
-        site = sub->co_lu.lo_dev->ld_site;
-        bkt  = lu_site_bkt_from_fid(site, &sub->co_lu.lo_header->loh_fid);
+       sub = lovsub2cl(los);
+       site = sub->co_lu.lo_dev->ld_site;
+       wq = lu_site_wq_from_fid(site, &sub->co_lu.lo_header->loh_fid);
 
         cl_object_kill(env, sub);
         /* release a reference to the sub-object and ... */
@@ -299,7 +302,7 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
        if (r0->lo_sub[idx] == los) {
                waiter = &lov_env_info(env)->lti_waiter;
                init_waitqueue_entry(waiter, current);
-               add_wait_queue(&bkt->lsb_marche_funebre, waiter);
+               add_wait_queue(wq, waiter);
                set_current_state(TASK_UNINTERRUPTIBLE);
                while (1) {
                        /* this wait-queue is signaled at the end of
@@ -315,7 +318,7 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
                                break;
                        }
                }
-               remove_wait_queue(&bkt->lsb_marche_funebre, waiter);
+               remove_wait_queue(wq, waiter);
        }
        LASSERT(r0->lo_sub[idx] == NULL);
 }
@@ -451,11 +454,11 @@ static int lov_attr_get_dom(const struct lu_env *env, struct lov_object *lov,
        cl_lvb2attr(attr, &loi->loi_lvb);
 
        /* DoM component size can be bigger than stripe size after
-        * client's setattr RPC, so do not count anything beyound
+        * client's setattr RPC, so do not count anything beyond
         * component end. Alternatively, check that limit on server
         * and do not allow size overflow there. */
-       if (attr->cat_size > lle->lle_extent.e_end)
-               attr->cat_size = lle->lle_extent.e_end;
+       if (attr->cat_size > lle->lle_extent->e_end)
+               attr->cat_size = lle->lle_extent->e_end;
 
        attr->cat_kms = attr->cat_size;
 
@@ -629,10 +632,14 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
 {
        struct lov_layout_composite *comp = &state->composite;
        struct lov_layout_entry *lle;
+       struct lov_mirror_entry *lre;
        unsigned int entry_count;
        unsigned int psz = 0;
+       unsigned int mirror_count;
+       int flr_state = lsm->lsm_flags & LCM_FL_FLR_MASK;
        int result = 0;
-       int i;
+       unsigned int seq;
+       int i, j;
 
        ENTRY;
 
@@ -641,18 +648,36 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
        lov->lo_lsm = lsm_addref(lsm);
        lov->lo_layout_invalid = true;
 
+       dump_lsm(D_INODE, lsm);
+
        entry_count = lsm->lsm_entry_count;
-       comp->lo_entry_count = entry_count;
+
+       spin_lock_init(&comp->lo_write_lock);
+       comp->lo_flags = lsm->lsm_flags;
+       comp->lo_mirror_count = lsm->lsm_mirror_count + 1;
+       comp->lo_entry_count = lsm->lsm_entry_count;
+       comp->lo_preferred_mirror = -1;
+
+       if (equi(flr_state == LCM_FL_NONE, comp->lo_mirror_count > 1))
+               RETURN(-EINVAL);
+
+       OBD_ALLOC(comp->lo_mirrors,
+                 comp->lo_mirror_count * sizeof(*comp->lo_mirrors));
+       if (comp->lo_mirrors == NULL)
+               RETURN(-ENOMEM);
 
        OBD_ALLOC(comp->lo_entries, entry_count * sizeof(*comp->lo_entries));
        if (comp->lo_entries == NULL)
                RETURN(-ENOMEM);
 
        /* Initiate all entry types and extents data at first */
-       for (i = 0; i < entry_count; i++) {
+       for (i = 0, j = 0, mirror_count = 1; i < entry_count; i++) {
+               int mirror_id = 0;
+
                lle = &comp->lo_entries[i];
 
-               lle->lle_type = lov_entry_type(lsm->lsm_entries[i]);
+               lle->lle_lsme = lsm->lsm_entries[i];
+               lle->lle_type = lov_entry_type(lle->lle_lsme);
                switch (lle->lle_type) {
                case LOV_PATTERN_RAID0:
                        lle->lle_comp_ops = &raid0_ops;
@@ -667,30 +692,102 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
                        dump_lsm(D_ERROR, lsm);
                        RETURN(-EIO);
                }
-               lle->lle_extent = lsm->lsm_entries[i]->lsme_extent;
+
+               lle->lle_extent = &lle->lle_lsme->lsme_extent;
+               lle->lle_valid = !(lle->lle_lsme->lsme_flags & LCME_FL_STALE);
+
+               if (flr_state != LCM_FL_NONE)
+                       mirror_id = mirror_id_of(lle->lle_lsme->lsme_id);
+
+               lre = &comp->lo_mirrors[j];
+               if (i > 0) {
+                       if (mirror_id == lre->lre_mirror_id) {
+                               lre->lre_valid |= lle->lle_valid;
+                               lre->lre_stale |= !lle->lle_valid;
+                               lre->lre_end = i;
+                               continue;
+                       }
+
+                       /* new mirror detected, assume that the mirrors
+                        * are shorted in layout */
+                       ++mirror_count;
+                       ++j;
+                       if (j >= comp->lo_mirror_count)
+                               break;
+
+                       lre = &comp->lo_mirrors[j];
+               }
+
+               /* entries must be sorted by mirrors */
+               lre->lre_mirror_id = mirror_id;
+               lre->lre_start = lre->lre_end = i;
+               lre->lre_preferred = !!(lle->lle_lsme->lsme_flags &
+                                       LCME_FL_PREF_RD);
+               lre->lre_valid = lle->lle_valid;
+               lre->lre_stale = !lle->lle_valid;
+       }
+
+       /* sanity check for FLR */
+       if (mirror_count != comp->lo_mirror_count) {
+               CDEBUG(D_INODE, DFID
+                      " doesn't have the # of mirrors it claims, %u/%u\n",
+                      PFID(lu_object_fid(lov2lu(lov))), mirror_count,
+                      comp->lo_mirror_count + 1);
+
+               GOTO(out, result = -EINVAL);
        }
 
-       i = 0;
        lov_foreach_layout_entry(lov, lle) {
+               int index = lov_layout_entry_index(lov, lle);
+
                /**
                 * If the component has not been init-ed on MDS side, for
                 * PFL layout, we'd know that the components beyond this one
                 * will be dynamically init-ed later on file write/trunc ops.
                 */
-               if (lsm_entry_inited(lsm, i)) {
-                       result = lle->lle_comp_ops->lco_init(env, dev, lov, i,
-                                                            conf, lle);
-                       if (result < 0)
-                               break;
+               if (!lsme_inited(lle->lle_lsme))
+                       continue;
 
-                       LASSERT(ergo(psz > 0, psz == result));
-                       psz = result;
-               }
-               i++;
+               result = lle->lle_comp_ops->lco_init(env, dev, lov, index,
+                                                    conf, lle);
+               if (result < 0)
+                       break;
+
+               LASSERT(ergo(psz > 0, psz == result));
+               psz = result;
        }
+
        if (psz > 0)
                cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
 
+       /* decide the preferred mirror. It uses the hash value of lov_object
+        * so that different clients would use different mirrors for read. */
+       mirror_count = 0;
+       seq = hash_long((unsigned long)lov, 8);
+       for (i = 0; i < comp->lo_mirror_count; i++) {
+               unsigned int idx = (i + seq) % comp->lo_mirror_count;
+
+               lre = lov_mirror_entry(lov, idx);
+               if (lre->lre_stale)
+                       continue;
+
+               mirror_count++; /* valid mirror */
+
+               if (lre->lre_preferred || comp->lo_preferred_mirror < 0)
+                       comp->lo_preferred_mirror = idx;
+       }
+       if (!mirror_count) {
+               CDEBUG(D_INODE, DFID
+                      " doesn't have any valid mirrors\n",
+                      PFID(lu_object_fid(lov2lu(lov))));
+
+               comp->lo_preferred_mirror = 0;
+       }
+
+       LASSERT(comp->lo_preferred_mirror >= 0);
+
+       EXIT;
+out:
        return result > 0 ? 0 : result;
 }
 
@@ -716,10 +813,25 @@ static int lov_init_released(const struct lu_env *env,
        return 0;
 }
 
+static int lov_init_foreign(const struct lu_env *env,
+                           struct lov_device *dev, struct lov_object *lov,
+                           struct lov_stripe_md *lsm,
+                           const struct cl_object_conf *conf,
+                           union lov_layout_state *state)
+{
+       LASSERT(lsm != NULL);
+       LASSERT(lov->lo_type == LLT_FOREIGN);
+       LASSERT(lov->lo_lsm == NULL);
+
+       lov->lo_lsm = lsm_addref(lsm);
+       return 0;
+}
+
 static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
                            union lov_layout_state *state)
 {
-       LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
+       LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED ||
+               lov->lo_type == LLT_FOREIGN);
 
        lov_layout_wait(env, lov);
        return 0;
@@ -768,6 +880,14 @@ static void lov_fini_composite(const struct lu_env *env,
                comp->lo_entries = NULL;
        }
 
+       if (comp->lo_mirrors != NULL) {
+               OBD_FREE(comp->lo_mirrors,
+                        comp->lo_mirror_count * sizeof(*comp->lo_mirrors));
+               comp->lo_mirrors = NULL;
+       }
+
+       memset(comp, 0, sizeof(*comp));
+
        dump_lsm(D_INODE, lov->lo_lsm);
        lov_free_memmd(&lov->lo_lsm);
 
@@ -833,6 +953,23 @@ static int lov_print_released(const struct lu_env *env, void *cookie,
        return 0;
 }
 
+static int lov_print_foreign(const struct lu_env *env, void *cookie,
+                               lu_printer_t p, const struct lu_object *o)
+{
+       struct lov_object       *lov = lu2lov(o);
+       struct lov_stripe_md    *lsm = lov->lo_lsm;
+
+       (*p)(env, cookie,
+               "foreign: %s, lsm{%p 0x%08X %d %u}:\n",
+               lov->lo_layout_invalid ? "invalid" : "valid", lsm,
+               lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
+               lsm->lsm_layout_gen);
+       (*p)(env, cookie,
+               "raw_ea_content '%.*s'\n",
+               (int)lsm->lsm_foreign_size, (char *)lsm_foreign(lsm));
+       return 0;
+}
+
 /**
  * Implements cl_object_operations::coo_attr_get() method for an object
  * without stripes (LLT_EMPTY layout type).
@@ -854,7 +991,6 @@ static int lov_attr_get_composite(const struct lu_env *env,
        struct lov_object       *lov = cl2lov(obj);
        struct lov_layout_entry *entry;
        int                      result = 0;
-       int                      index = 0;
 
        ENTRY;
 
@@ -862,18 +998,20 @@ static int lov_attr_get_composite(const struct lu_env *env,
        attr->cat_blocks = 0;
        lov_foreach_layout_entry(lov, entry) {
                struct cl_attr *lov_attr = NULL;
+               int index = lov_layout_entry_index(lov, entry);
+
+               if (!entry->lle_valid)
+                       continue;
 
                /* PFL: This component has not been init-ed. */
                if (!lsm_entry_inited(lov->lo_lsm, index))
-                       break;
+                       continue;
 
                result = entry->lle_comp_ops->lco_getattr(env, lov, index,
                                                          entry, &lov_attr);
                if (result < 0)
                        RETURN(result);
 
-               index++;
-
                if (lov_attr == NULL)
                        continue;
 
@@ -895,9 +1033,26 @@ static int lov_attr_get_composite(const struct lu_env *env,
                if (attr->cat_mtime < lov_attr->cat_mtime)
                        attr->cat_mtime = lov_attr->cat_mtime;
        }
+
        RETURN(0);
 }
 
+static int lov_flush_composite(const struct lu_env *env,
+                              struct cl_object *obj,
+                              struct ldlm_lock *lock)
+{
+       struct lov_object *lov = cl2lov(obj);
+       struct lovsub_object *lovsub;
+
+       ENTRY;
+
+       if (!lsme_is_dom(lov->lo_lsm->lsm_entries[0]))
+               RETURN(-EINVAL);
+
+       lovsub = lov->u.composite.lo_entries[0].lle_dom.lo_dom;
+       RETURN(cl_object_flush(env, lovsub2cl(lovsub), lock));
+}
+
 const static struct lov_layout_operations lov_dispatch[] = {
        [LLT_EMPTY] = {
                .llo_init      = lov_init_empty,
@@ -928,6 +1083,17 @@ const static struct lov_layout_operations lov_dispatch[] = {
                .llo_lock_init = lov_lock_init_composite,
                .llo_io_init   = lov_io_init_composite,
                .llo_getattr   = lov_attr_get_composite,
+               .llo_flush     = lov_flush_composite,
+       },
+       [LLT_FOREIGN] = {
+               .llo_init      = lov_init_foreign,
+               .llo_delete    = lov_delete_empty,
+               .llo_fini      = lov_fini_released,
+               .llo_print     = lov_print_foreign,
+               .llo_page_init = lov_page_init_foreign,
+               .llo_lock_init = lov_lock_init_empty,
+               .llo_io_init   = lov_io_init_empty,
+               .llo_getattr   = lov_attr_get_empty,
        },
 };
 
@@ -960,6 +1126,9 @@ static enum lov_layout_type lov_type(struct lov_stripe_md *lsm)
            lsm->lsm_magic == LOV_MAGIC_COMP_V1)
                return LLT_COMP;
 
+       if (lsm->lsm_magic == LOV_MAGIC_FOREIGN)
+               return LLT_FOREIGN;
+
        return LLT_EMPTY;
 }
 
@@ -1089,12 +1258,11 @@ static int lov_layout_change(const struct lu_env *unused,
        CDEBUG(D_INODE, DFID "Apply new layout lov %p, type %d\n",
               PFID(lu_object_fid(lov2lu(lov))), lov, llt);
 
-       lov->lo_type = LLT_EMPTY;
-
        /* page bufsize fixup */
        cl_object_header(&lov->lo_cl)->coh_page_bufsize -=
                lov_page_slice_fixup(lov, NULL);
 
+       lov->lo_type = llt;
        rc = new_ops->llo_init(env, lov_dev, lov, lsm, conf, state);
        if (rc != 0) {
                struct obd_device *obd = lov2obd(lov_dev->ld_lov);
@@ -1104,11 +1272,10 @@ static int lov_layout_change(const struct lu_env *unused,
                new_ops->llo_delete(env, lov, state);
                new_ops->llo_fini(env, lov, state);
                /* this file becomes an EMPTY file. */
+               lov->lo_type = LLT_EMPTY;
                GOTO(out, rc);
        }
 
-       lov->lo_type = llt;
-
 out:
        cl_env_put(env, &refcheck);
        RETURN(rc);
@@ -1264,14 +1431,19 @@ int lov_page_init(const struct lu_env *env, struct cl_object *obj,
 int lov_io_init(const struct lu_env *env, struct cl_object *obj,
                struct cl_io *io)
 {
-       CL_IO_SLICE_CLEAN(lov_env_io(env), lis_cl);
+       CL_IO_SLICE_CLEAN(lov_env_io(env), lis_preserved);
 
        CDEBUG(D_INODE, DFID "io %p type %d ignore/verify layout %d/%d\n",
               PFID(lu_object_fid(&obj->co_lu)), io, io->ci_type,
               io->ci_ignore_layout, io->ci_verify_layout);
 
+       /* IO type CIT_MISC with ci_ignore_layout set are usually invoked from
+        * the OSC layer. It shouldn't take lov layout conf lock in that case,
+        * because as long as the OSC object exists, the layout can't be
+        * reconfigured. */
        return LOV_2DISPATCH_MAYLOCK(cl2lov(obj), llo_io_init,
-                                    !io->ci_ignore_layout, env, obj, io);
+                       !(io->ci_ignore_layout && io->ci_type == CIT_MISC),
+                       env, obj, io);
 }
 
 /**
@@ -1538,7 +1710,7 @@ int fiemap_for_stripe(const struct lu_env *env, struct cl_object *obj,
        if (lun_start == lun_end)
                return 0;
 
-       req_fm_len = obd_object_end - lun_start;
+       req_fm_len = obd_object_end - lun_start + 1;
        fs->fs_fm->fm_length = 0;
        len_mapped_single_call = 0;
 
@@ -1581,7 +1753,7 @@ int fiemap_for_stripe(const struct lu_env *env, struct cl_object *obj,
                        fs->fs_fm->fm_mapped_extents = 1;
 
                        fm_ext[0].fe_logical = lun_start;
-                       fm_ext[0].fe_length = obd_object_end - lun_start;
+                       fm_ext[0].fe_length = obd_object_end - lun_start + 1;
                        fm_ext[0].fe_flags |= FIEMAP_EXTENT_UNKNOWN;
 
                        goto inactive_tgt;
@@ -1696,8 +1868,11 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
        ENTRY;
 
        lsm = lov_lsm_addref(cl2lov(obj));
-       if (lsm == NULL)
-               RETURN(-ENODATA);
+       if (lsm == NULL) {
+               /* no extent: there is no object for mapping */
+               fiemap->fm_mapped_extents = 0;
+               return 0;
+       }
 
        if (!(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER)) {
                /**
@@ -1781,6 +1956,7 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
        if (start_entry == -1 || end_entry == -1)
                GOTO(out_fm_local, rc = -EINVAL);
 
+       /* TODO: rewrite it with lov_foreach_io_layout() */
        for (entry = start_entry; entry <= end_entry; entry++) {
                lsme = lsm->lsm_entries[entry];
 
@@ -1886,7 +2062,18 @@ static int lov_object_layout_get(const struct lu_env *env,
 
        cl->cl_size = lov_comp_md_size(lsm);
        cl->cl_layout_gen = lsm->lsm_layout_gen;
-       cl->cl_is_composite = lsm_is_composite(lsm->lsm_magic);
+       cl->cl_dom_comp_size = 0;
+       cl->cl_is_released = lsm->lsm_is_released;
+       if (lsm_is_composite(lsm->lsm_magic)) {
+               struct lov_stripe_md_entry *lsme = lsm->lsm_entries[0];
+
+               cl->cl_is_composite = true;
+
+               if (lsme_is_dom(lsme))
+                       cl->cl_dom_comp_size = lsme->lsme_extent.e_end;
+       } else {
+               cl->cl_is_composite = false;
+       }
 
        rc = lov_lsm_pack(lsm, buf->lb_buf, buf->lb_len);
        lov_lsm_put(lsm);
@@ -1910,6 +2097,12 @@ static loff_t lov_object_maxbytes(struct cl_object *obj)
        return maxbytes;
 }
 
+static int lov_object_flush(const struct lu_env *env, struct cl_object *obj,
+                           struct ldlm_lock *lock)
+{
+       return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_flush, env, obj, lock);
+}
+
 static const struct cl_object_operations lov_ops = {
        .coo_page_init    = lov_page_init,
        .coo_lock_init    = lov_lock_init,
@@ -1921,6 +2114,7 @@ static const struct cl_object_operations lov_ops = {
        .coo_layout_get   = lov_object_layout_get,
        .coo_maxbytes     = lov_object_maxbytes,
        .coo_fiemap       = lov_object_fiemap,
+       .coo_object_flush = lov_object_flush
 };
 
 static const struct lu_object_operations lov_lu_obj_ops = {
@@ -2014,6 +2208,8 @@ int lov_read_and_clear_async_rc(struct cl_object *clob)
                }
                case LLT_RELEASED:
                case LLT_EMPTY:
+                       /* fall through */
+               case LLT_FOREIGN:
                        break;
                default:
                        LBUG();