Whamcloud - gitweb
LU-14182 lov: cancel layout lock on replay deadlock
[fs/lustre-release.git] / lustre / lov / lov_object.c
index eb385d3..d3186cd 100644 (file)
@@ -178,7 +178,7 @@ static int lov_init_sub(const struct lu_env *env, struct lov_object *lov,
                old_obj = lu_object_locate(&parent->coh_lu, &lov_device_type);
                LASSERT(old_obj != NULL);
                old_lov = cl2lov(lu2cl(old_obj));
-               if (old_lov->lo_layout_invalid) {
+               if (test_bit(LO_LAYOUT_INVALID, &old_lov->lo_obj_flags)) {
                        /* the object's layout has already changed but isn't
                         * refreshed */
                        lu_object_unhash(env, &subobj->co_lu);
@@ -216,6 +216,7 @@ static int lov_init_raid0(const struct lu_env *env, struct lov_device *dev,
 
        spin_lock_init(&r0->lo_sub_lock);
        r0->lo_nr = lse->lsme_stripe_count;
+       r0->lo_trunc_stripeno = -1;
 
        OBD_ALLOC_PTR_ARRAY_LARGE(r0->lo_sub, r0->lo_nr);
        if (r0->lo_sub == NULL)
@@ -632,7 +633,7 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
        LASSERT(lsm->lsm_entry_count > 0);
        LASSERT(lov->lo_lsm == NULL);
        lov->lo_lsm = lsm_addref(lsm);
-       lov->lo_layout_invalid = true;
+       set_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags);
 
        dump_lsm(D_INODE, lsm);
 
@@ -682,6 +683,9 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
                        }
                        lle->lle_comp_ops = &dom_ops;
                        break;
+               case LOV_PATTERN_FOREIGN:
+                       lle->lle_comp_ops = NULL;
+                       break;
                default:
                        CERROR("%s: unknown composite layout entry type %i\n",
                               lov2obd(dev->ld_lov)->obd_name,
@@ -701,6 +705,8 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
                        if (mirror_id == lre->lre_mirror_id) {
                                lre->lre_valid |= lle->lle_valid;
                                lre->lre_stale |= !lle->lle_valid;
+                               lre->lre_foreign |=
+                                       lsme_is_foreign(lle->lle_lsme);
                                lre->lre_end = i;
                                continue;
                        }
@@ -722,6 +728,7 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
                                        LCME_FL_PREF_RD);
                lre->lre_valid = lle->lle_valid;
                lre->lre_stale = !lle->lle_valid;
+               lre->lre_foreign = lsme_is_foreign(lle->lle_lsme);
        }
 
        /* sanity check for FLR */
@@ -745,6 +752,9 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
                if (!lsme_inited(lle->lle_lsme))
                        continue;
 
+               if (lsme_is_foreign(lle->lle_lsme))
+                       continue;
+
                result = lle->lle_comp_ops->lco_init(env, dev, lov, index,
                                                     conf, lle);
                if (result < 0)
@@ -768,6 +778,9 @@ static int lov_init_composite(const struct lu_env *env, struct lov_device *dev,
                if (lre->lre_stale)
                        continue;
 
+               if (lre->lre_foreign)
+                       continue;
+
                mirror_count++; /* valid mirror */
 
                if (lre->lre_preferred || comp->lo_preferred_mirror < 0)
@@ -847,8 +860,12 @@ static int lov_delete_composite(const struct lu_env *env,
 
        lov_layout_wait(env, lov);
        if (comp->lo_entries)
-               lov_foreach_layout_entry(lov, entry)
+               lov_foreach_layout_entry(lov, entry) {
+                       if (entry->lle_lsme && lsme_is_foreign(entry->lle_lsme))
+                               continue;
+
                        lov_delete_raid0(env, lov, entry);
+       }
 
        RETURN(0);
 }
@@ -902,7 +919,8 @@ static void lov_fini_released(const struct lu_env *env, struct lov_object *lov,
 static int lov_print_empty(const struct lu_env *env, void *cookie,
                            lu_printer_t p, const struct lu_object *o)
 {
-        (*p)(env, cookie, "empty %d\n", lu2lov(o)->lo_layout_invalid);
+        (*p)(env, cookie, "empty %d\n",
+            test_bit(LO_LAYOUT_INVALID, &lu2lov(o)->lo_obj_flags));
         return 0;
 }
 
@@ -915,8 +933,8 @@ static int lov_print_composite(const struct lu_env *env, void *cookie,
 
        (*p)(env, cookie, "entries: %d, %s, lsm{%p 0x%08X %d %u}:\n",
             lsm->lsm_entry_count,
-            lov->lo_layout_invalid ? "invalid" : "valid", lsm,
-            lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
+            test_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags) ? "invalid" :
+            "valid", lsm, lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
             lsm->lsm_layout_gen);
 
        for (i = 0; i < lsm->lsm_entry_count; i++) {
@@ -929,7 +947,9 @@ static int lov_print_composite(const struct lu_env *env, void *cookie,
                     lse->lsme_id, lse->lsme_pattern, lse->lsme_layout_gen,
                     lse->lsme_flags, lse->lsme_stripe_count,
                     lse->lsme_stripe_size);
-               lov_print_raid0(env, cookie, p, lle);
+
+               if (!lsme_is_foreign(lse))
+                       lov_print_raid0(env, cookie, p, lle);
        }
 
        return 0;
@@ -943,8 +963,8 @@ static int lov_print_released(const struct lu_env *env, void *cookie,
 
        (*p)(env, cookie,
                "released: %s, lsm{%p 0x%08X %d %u}:\n",
-               lov->lo_layout_invalid ? "invalid" : "valid", lsm,
-               lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
+               test_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags) ? "invalid" :
+               "valid", lsm, lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
                lsm->lsm_layout_gen);
        return 0;
 }
@@ -957,7 +977,8 @@ static int lov_print_foreign(const struct lu_env *env, void *cookie,
 
        (*p)(env, cookie,
                "foreign: %s, lsm{%p 0x%08X %d %u}:\n",
-               lov->lo_layout_invalid ? "invalid" : "valid", lsm,
+               test_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags) ?
+               "invalid" : "valid", lsm,
                lsm->lsm_magic, atomic_read(&lsm->lsm_refc),
                lsm->lsm_layout_gen);
        (*p)(env, cookie,
@@ -1353,14 +1374,14 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
                dump_lsm(D_INODE, lsm);
        }
 
-       lov_conf_lock(lov);
        if (conf->coc_opc == OBJECT_CONF_INVALIDATE) {
-               lov->lo_layout_invalid = true;
-               GOTO(out, result = 0);
+               set_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags);
+               GOTO(out_lsm, result = 0);
        }
 
+       lov_conf_lock(lov);
        if (conf->coc_opc == OBJECT_CONF_WAIT) {
-               if (lov->lo_layout_invalid &&
+               if (test_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags) &&
                    atomic_read(&lov->lo_active_ios) > 0) {
                        lov_conf_unlock(lov);
                        result = lov_layout_wait(env, lov);
@@ -1374,28 +1395,34 @@ static int lov_conf_set(const struct lu_env *env, struct cl_object *obj,
        if ((lsm == NULL && lov->lo_lsm == NULL) ||
            ((lsm != NULL && lov->lo_lsm != NULL) &&
             (lov->lo_lsm->lsm_layout_gen == lsm->lsm_layout_gen) &&
+            (lov->lo_lsm->lsm_flags == lsm->lsm_flags) &&
             (lov->lo_lsm->lsm_entries[0]->lsme_pattern ==
              lsm->lsm_entries[0]->lsme_pattern))) {
                /* same version of layout */
-               lov->lo_layout_invalid = false;
+               clear_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags);
                GOTO(out, result = 0);
        }
 
        /* will change layout - check if there still exists active IO. */
        if (atomic_read(&lov->lo_active_ios) > 0) {
-               lov->lo_layout_invalid = true;
+               set_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags);
                GOTO(out, result = -EBUSY);
        }
 
        result = lov_layout_change(env, lov, lsm, conf);
-       lov->lo_layout_invalid = result != 0;
+       if (result)
+               set_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags);
+       else
+               clear_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags);
        EXIT;
 
 out:
        lov_conf_unlock(lov);
+out_lsm:
        lov_lsm_put(lsm);
-       CDEBUG(D_INODE, DFID" lo_layout_invalid=%d\n",
-              PFID(lu_object_fid(lov2lu(lov))), lov->lo_layout_invalid);
+       CDEBUG(D_INODE, DFID" lo_layout_invalid=%u\n",
+              PFID(lu_object_fid(lov2lu(lov))),
+              test_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags));
        RETURN(result);
 }
 
@@ -2146,7 +2173,8 @@ static int lov_object_layout_get(const struct lu_env *env,
        rc = lov_lsm_pack(lsm, buf->lb_buf, buf->lb_len);
        lov_lsm_put(lsm);
 
-       RETURN(rc < 0 ? rc : 0);
+       /* return error or number of bytes */
+       RETURN(rc);
 }
 
 static loff_t lov_object_maxbytes(struct cl_object *obj)
@@ -2229,7 +2257,8 @@ static struct lov_stripe_md *lov_lsm_addref(struct lov_object *lov)
                lsm = lsm_addref(lov->lo_lsm);
                CDEBUG(D_INODE, "lsm %p addref %d/%d by %p.\n",
                        lsm, atomic_read(&lsm->lsm_refc),
-                       lov->lo_layout_invalid, current);
+                       test_bit(LO_LAYOUT_INVALID, &lov->lo_obj_flags),
+                       current);
        }
        lov_conf_thaw(lov);
        return lsm;