Whamcloud - gitweb
LU-12296 llite: improve ll_dom_lock_cancel
[fs/lustre-release.git] / lustre / lov / lov_object.c
index 15d5c3c..cc041c0 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright (c) 2008, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2016, Intel Corporation.
+ * Copyright (c) 2011, 2017, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -37,6 +37,8 @@
 
 #define DEBUG_SUBSYSTEM S_LOV
 
+#include <linux/random.h>
+
 #include "lov_cl_internal.h"
 
 static inline struct lov_device *lov_object_dev(struct lov_object *obj)
@@ -74,6 +76,8 @@ struct lov_layout_operations {
                             struct cl_object *obj, struct cl_io *io);
         int  (*llo_getattr)(const struct lu_env *env, struct cl_object *obj,
                             struct cl_attr *attr);
+       int  (*llo_flush)(const struct lu_env *env, struct cl_object *obj,
+                         struct ldlm_lock *lock);
 };
 
 static int lov_layout_wait(const struct lu_env *env, struct lov_object *lov);
@@ -211,9 +215,8 @@ static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev,
 
        spin_lock_init(&r0->lo_sub_lock);
        r0->lo_nr = lse->lsme_stripe_count;
-       LASSERT(r0->lo_nr <= lov_targets_nr(dev));
 
-       OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof r0->lo_sub[0]);
+       OBD_ALLOC_LARGE(r0->lo_sub, r0->lo_nr * sizeof(r0->lo_sub[0]));
        if (r0->lo_sub == NULL)
                GOTO(out, result = -ENOMEM);
 
@@ -280,14 +283,14 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
 {
        struct cl_object        *sub;
        struct lu_site          *site;
-       struct lu_site_bkt_data *bkt;
-       wait_queue_t          *waiter;
+       wait_queue_head_t *wq;
+       wait_queue_entry_t *waiter;
 
         LASSERT(r0->lo_sub[idx] == los);
 
-        sub  = lovsub2cl(los);
-        site = sub->co_lu.lo_dev->ld_site;
-        bkt  = lu_site_bkt_from_fid(site, &sub->co_lu.lo_header->loh_fid);
+       sub = lovsub2cl(los);
+       site = sub->co_lu.lo_dev->ld_site;
+       wq = lu_site_wq_from_fid(site, &sub->co_lu.lo_header->loh_fid);
 
         cl_object_kill(env, sub);
         /* release a reference to the sub-object and ... */
@@ -299,7 +302,7 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
        if (r0->lo_sub[idx] == los) {
                waiter = &lov_env_info(env)->lti_waiter;
                init_waitqueue_entry(waiter, current);
-               add_wait_queue(&bkt->lsb_marche_funebre, waiter);
+               add_wait_queue(wq, waiter);
                set_current_state(TASK_UNINTERRUPTIBLE);
                while (1) {
                        /* this wait-queue is signaled at the end of
@@ -315,7 +318,7 @@ static void lov_subobject_kill(const struct lu_env *env, struct lov_object *lov,
                                break;
                        }
                }
-               remove_wait_queue(&bkt->lsb_marche_funebre, waiter);
+               remove_wait_queue(wq, waiter);
        }
        LASSERT(r0->lo_sub[idx] == NULL);
 }
@@ -635,6 +638,7 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
        unsigned int mirror_count;
        int flr_state = lsm->lsm_flags & LCM_FL_FLR_MASK;
        int result = 0;
+       unsigned int seq;
        int i, j;
 
        ENTRY;
@@ -654,7 +658,7 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
        comp->lo_entry_count = lsm->lsm_entry_count;
        comp->lo_preferred_mirror = -1;
 
-       if (equi(flr_state == LCM_FL_NOT_FLR, comp->lo_mirror_count > 1))
+       if (equi(flr_state == LCM_FL_NONE, comp->lo_mirror_count > 1))
                RETURN(-EINVAL);
 
        OBD_ALLOC(comp->lo_mirrors,
@@ -692,7 +696,7 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
                lle->lle_extent = &lle->lle_lsme->lsme_extent;
                lle->lle_valid = !(lle->lle_lsme->lsme_flags & LCME_FL_STALE);
 
-               if (flr_state != LCM_FL_NOT_FLR)
+               if (flr_state != LCM_FL_NONE)
                        mirror_id = mirror_id_of(lle->lle_lsme->lsme_id);
 
                lre = &comp->lo_mirrors[j];
@@ -717,8 +721,8 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
                /* entries must be sorted by mirrors */
                lre->lre_mirror_id = mirror_id;
                lre->lre_start = lre->lre_end = i;
-               lre->lre_preferred = (lle->lle_lsme->lsme_flags &
-                                       LCME_FL_PREFERRED);
+               lre->lre_preferred = !!(lle->lle_lsme->lsme_flags &
+                                       LCME_FL_PREF_RD);
                lre->lre_valid = lle->lle_valid;
                lre->lre_stale = !lle->lle_valid;
        }
@@ -756,24 +760,28 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
        if (psz > 0)
                cl_object_header(&lov->lo_cl)->coh_page_bufsize += psz;
 
-       /* decide the preferred mirror */
-       mirror_count = 0, i = 0;
-       lov_foreach_mirror_entry(lov, lre) {
-               i++;
+       /* decide the preferred mirror. It uses the hash value of lov_object
+        * so that different clients would use different mirrors for read. */
+       mirror_count = 0;
+       seq = hash_long((unsigned long)lov, 8);
+       for (i = 0; i < comp->lo_mirror_count; i++) {
+               unsigned int idx = (i + seq) % comp->lo_mirror_count;
+
+               lre = lov_mirror_entry(lov, idx);
                if (lre->lre_stale)
                        continue;
 
                mirror_count++; /* valid mirror */
 
                if (lre->lre_preferred || comp->lo_preferred_mirror < 0)
-                       comp->lo_preferred_mirror = i - 1;
+                       comp->lo_preferred_mirror = idx;
        }
-       if (mirror_count == 0) {
+       if (!mirror_count) {
                CDEBUG(D_INODE, DFID
                       " doesn't have any valid mirrors\n",
                       PFID(lu_object_fid(lov2lu(lov))));
 
-               GOTO(out, result = -EINVAL);
+               comp->lo_preferred_mirror = 0;
        }
 
        LASSERT(comp->lo_preferred_mirror >= 0);
@@ -805,10 +813,25 @@ static int lov_init_released(const struct lu_env *env,
        return 0;
 }
 
+static int lov_init_foreign(const struct lu_env *env,
+                           struct lov_device *dev, struct lov_object *lov,
+                           struct lov_stripe_md *lsm,
+                           const struct cl_object_conf *conf,
+                           union lov_layout_state *state)
+{
+       LASSERT(lsm != NULL);
+       LASSERT(lov->lo_type == LLT_FOREIGN);
+       LASSERT(lov->lo_lsm == NULL);
+
+       lov->lo_lsm = lsm_addref(lsm);
+       return 0;
+}
+
 static int lov_delete_empty(const struct lu_env *env, struct lov_object *lov,
                            union lov_layout_state *state)
 {
-       LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED);
+       LASSERT(lov->lo_type == LLT_EMPTY || lov->lo_type == LLT_RELEASED ||
+               lov->lo_type == LLT_FOREIGN);
 
        lov_layout_wait(env, lov);
        return 0;
@@ -930,6 +953,23 @@ static int lov_print_released(const struct lu_env *env, void *cookie,
        return 0;
 }
 
+static int lov_print_foreign(const struct lu_env *env, void *cookie,
+                               lu_printer_t p, const struct lu_object *o)
+{
+       struct lov_object       *lov = lu2lov(o);
+       struct lov_stripe_md    *lsm = lov->lo_lsm;
+
+       (*p)(env, cookie,
+               "foreign: %s, lsm{%p 0x%08X %d %u}:\n",
+               lov->lo_layout_invalid ? "invalid" : "valid", lsm,
+               lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
+               lsm->lsm_layout_gen);
+       (*p)(env, cookie,
+               "raw_ea_content '%.*s'\n",
+               (int)lsm->lsm_foreign_size, (char *)lsm_foreign(lsm));
+       return 0;
+}
+
 /**
  * Implements cl_object_operations::coo_attr_get() method for an object
  * without stripes (LLT_EMPTY layout type).
@@ -997,6 +1037,22 @@ static int lov_attr_get_composite(const struct lu_env *env,
        RETURN(0);
 }
 
+static int lov_flush_composite(const struct lu_env *env,
+                              struct cl_object *obj,
+                              struct ldlm_lock *lock)
+{
+       struct lov_object *lov = cl2lov(obj);
+       struct lovsub_object *lovsub;
+
+       ENTRY;
+
+       if (!lsme_is_dom(lov->lo_lsm->lsm_entries[0]))
+               RETURN(-EINVAL);
+
+       lovsub = lov->u.composite.lo_entries[0].lle_dom.lo_dom;
+       RETURN(cl_object_flush(env, lovsub2cl(lovsub), lock));
+}
+
 const static struct lov_layout_operations lov_dispatch[] = {
        [LLT_EMPTY] = {
                .llo_init      = lov_init_empty,
@@ -1027,6 +1083,17 @@ const static struct lov_layout_operations lov_dispatch[] = {
                .llo_lock_init = lov_lock_init_composite,
                .llo_io_init   = lov_io_init_composite,
                .llo_getattr   = lov_attr_get_composite,
+               .llo_flush     = lov_flush_composite,
+       },
+       [LLT_FOREIGN] = {
+               .llo_init      = lov_init_foreign,
+               .llo_delete    = lov_delete_empty,
+               .llo_fini      = lov_fini_released,
+               .llo_print     = lov_print_foreign,
+               .llo_page_init = lov_page_init_foreign,
+               .llo_lock_init = lov_lock_init_empty,
+               .llo_io_init   = lov_io_init_empty,
+               .llo_getattr   = lov_attr_get_empty,
        },
 };
 
@@ -1059,6 +1126,9 @@ static enum lov_layout_type lov_type(struct lov_stripe_md *lsm)
            lsm->lsm_magic == LOV_MAGIC_COMP_V1)
                return LLT_COMP;
 
+       if (lsm->lsm_magic == LOV_MAGIC_FOREIGN)
+               return LLT_FOREIGN;
+
        return LLT_EMPTY;
 }
 
@@ -1640,7 +1710,7 @@ int fiemap_for_stripe(const struct lu_env *env, struct cl_object *obj,
        if (lun_start == lun_end)
                return 0;
 
-       req_fm_len = obd_object_end - lun_start;
+       req_fm_len = obd_object_end - lun_start + 1;
        fs->fs_fm->fm_length = 0;
        len_mapped_single_call = 0;
 
@@ -1683,7 +1753,7 @@ int fiemap_for_stripe(const struct lu_env *env, struct cl_object *obj,
                        fs->fs_fm->fm_mapped_extents = 1;
 
                        fm_ext[0].fe_logical = lun_start;
-                       fm_ext[0].fe_length = obd_object_end - lun_start;
+                       fm_ext[0].fe_length = obd_object_end - lun_start + 1;
                        fm_ext[0].fe_flags |= FIEMAP_EXTENT_UNKNOWN;
 
                        goto inactive_tgt;
@@ -1798,8 +1868,11 @@ static int lov_object_fiemap(const struct lu_env *env, struct cl_object *obj,
        ENTRY;
 
        lsm = lov_lsm_addref(cl2lov(obj));
-       if (lsm == NULL)
-               RETURN(-ENODATA);
+       if (lsm == NULL) {
+               /* no extent: there is no object for mapping */
+               fiemap->fm_mapped_extents = 0;
+               return 0;
+       }
 
        if (!(fiemap->fm_flags & FIEMAP_FLAG_DEVICE_ORDER)) {
                /**
@@ -1990,6 +2063,7 @@ static int lov_object_layout_get(const struct lu_env *env,
        cl->cl_size = lov_comp_md_size(lsm);
        cl->cl_layout_gen = lsm->lsm_layout_gen;
        cl->cl_dom_comp_size = 0;
+       cl->cl_is_released = lsm->lsm_is_released;
        if (lsm_is_composite(lsm->lsm_magic)) {
                struct lov_stripe_md_entry *lsme = lsm->lsm_entries[0];
 
@@ -2023,6 +2097,12 @@ static loff_t lov_object_maxbytes(struct cl_object *obj)
        return maxbytes;
 }
 
+static int lov_object_flush(const struct lu_env *env, struct cl_object *obj,
+                           struct ldlm_lock *lock)
+{
+       return LOV_2DISPATCH_NOLOCK(cl2lov(obj), llo_flush, env, obj, lock);
+}
+
 static const struct cl_object_operations lov_ops = {
        .coo_page_init    = lov_page_init,
        .coo_lock_init    = lov_lock_init,
@@ -2034,6 +2114,7 @@ static const struct cl_object_operations lov_ops = {
        .coo_layout_get   = lov_object_layout_get,
        .coo_maxbytes     = lov_object_maxbytes,
        .coo_fiemap       = lov_object_fiemap,
+       .coo_object_flush = lov_object_flush
 };
 
 static const struct lu_object_operations lov_lu_obj_ops = {
@@ -2127,6 +2208,8 @@ int lov_read_and_clear_async_rc(struct cl_object *clob)
                }
                case LLT_RELEASED:
                case LLT_EMPTY:
+                       /* fall through */
+               case LLT_FOREIGN:
                        break;
                default:
                        LBUG();