Whamcloud - gitweb
LU-7422 mdt: fix ENOENT handling in mdt_intent_reint
[fs/lustre-release.git] / lustre / lod / lod_lov.c
index 7b3a373..6d6396d 100644 (file)
@@ -23,7 +23,7 @@
  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
  * Use is subject to license terms.
  *
- * Copyright (c) 2012, 2014, Intel Corporation.
+ * Copyright (c) 2012, 2015, Intel Corporation.
  */
 /*
  * lustre/lod/lod_lov.c
@@ -210,13 +210,15 @@ int lod_add_device(const struct lu_env *env, struct lod_device *lod,
        struct obd_connect_data *data = NULL;
        struct obd_export       *exp = NULL;
        struct obd_device       *obd;
-       struct lu_device        *ldev;
-       struct dt_device        *d;
+       struct lu_device        *lu_dev;
+       struct dt_device        *dt_dev;
        int                      rc;
        struct lod_tgt_desc     *tgt_desc;
        struct lod_tgt_descs    *ltd;
+       struct lustre_cfg       *lcfg;
        struct obd_uuid         obd_uuid;
        bool                    for_ost;
+       bool lock = false;
        ENTRY;
 
        CDEBUG(D_CONFIG, "osp:%s idx:%d gen:%d\n", osp, index, gen);
@@ -236,9 +238,15 @@ int lod_add_device(const struct lu_env *env, struct lod_device *lod,
                RETURN(-EINVAL);
        }
 
+       LASSERT(obd->obd_lu_dev != NULL);
+       LASSERT(obd->obd_lu_dev->ld_site == lod->lod_dt_dev.dd_lu_dev.ld_site);
+
+       lu_dev = obd->obd_lu_dev;
+       dt_dev = lu2dt_dev(lu_dev);
+
        OBD_ALLOC_PTR(data);
        if (data == NULL)
-               RETURN(-ENOMEM);
+               GOTO(out_cleanup, rc = -ENOMEM);
 
        data->ocd_connect_flags = OBD_CONNECT_INDEX | OBD_CONNECT_VERSION;
        data->ocd_version = LUSTRE_VERSION_CODE;
@@ -253,14 +261,14 @@ int lod_add_device(const struct lu_env *env, struct lod_device *lod,
                                           OBD_CONNECT_LRU_RESIZE |
 #endif
                                           OBD_CONNECT_MDS |
-                                          OBD_CONNECT_OSS_CAPA |
                                           OBD_CONNECT_REQPORTAL |
                                           OBD_CONNECT_SKIP_ORPHAN |
                                           OBD_CONNECT_FID |
                                           OBD_CONNECT_LVB_TYPE |
                                           OBD_CONNECT_VERSION |
                                           OBD_CONNECT_PINGLESS |
-                                          OBD_CONNECT_LFSCK;
+                                          OBD_CONNECT_LFSCK |
+                                          OBD_CONNECT_BULK_MBITS;
 
                data->ocd_group = tgt_index;
                ltd = &lod->lod_ost_descs;
@@ -270,17 +278,13 @@ int lod_add_device(const struct lu_env *env, struct lod_device *lod,
                for_ost = false;
                data->ocd_ibits_known = MDS_INODELOCK_UPDATE;
                data->ocd_connect_flags |= OBD_CONNECT_ACL |
-                                          OBD_CONNECT_MDS_CAPA |
-                                          OBD_CONNECT_OSS_CAPA |
                                           OBD_CONNECT_IBITS |
                                           OBD_CONNECT_MDS_MDS |
                                           OBD_CONNECT_FID |
                                           OBD_CONNECT_AT |
                                           OBD_CONNECT_FULL20 |
-                                          OBD_CONNECT_LFSCK;
-               /* XXX set MDS-MDS flags, remove this when running this
-                * on client*/
-               data->ocd_connect_flags |= OBD_CONNECT_MDS_MDS;
+                                          OBD_CONNECT_LFSCK |
+                                          OBD_CONNECT_BULK_MBITS;
                spin_lock(&imp->imp_lock);
                imp->imp_server_timeout = 1;
                spin_unlock(&imp->imp_lock);
@@ -295,21 +299,15 @@ int lod_add_device(const struct lu_env *env, struct lod_device *lod,
        if (rc) {
                CERROR("%s: cannot connect to next dev %s (%d)\n",
                       obd->obd_name, osp, rc);
-               GOTO(out_free, rc);
+               GOTO(out_cleanup, rc);
        }
 
-       LASSERT(obd->obd_lu_dev);
-       LASSERT(obd->obd_lu_dev->ld_site == lod->lod_dt_dev.dd_lu_dev.ld_site);
-
-       ldev = obd->obd_lu_dev;
-       d = lu2dt_dev(ldev);
-
        /* Allocate ost descriptor and fill it */
        OBD_ALLOC_PTR(tgt_desc);
        if (!tgt_desc)
                GOTO(out_conn, rc = -ENOMEM);
 
-       tgt_desc->ltd_tgt    = d;
+       tgt_desc->ltd_tgt    = dt_dev;
        tgt_desc->ltd_exp    = exp;
        tgt_desc->ltd_uuid   = obd->u.cli.cl_target_uuid;
        tgt_desc->ltd_gen    = gen;
@@ -337,6 +335,7 @@ int lod_add_device(const struct lu_env *env, struct lod_device *lod,
        }
 
        mutex_lock(&ltd->ltd_mutex);
+       lock = true;
        if (cfs_bitmap_check(ltd->ltd_tgt_bitmap, index)) {
                CERROR("%s: device %d is registered already\n", obd->obd_name,
                       index);
@@ -352,7 +351,7 @@ int lod_add_device(const struct lu_env *env, struct lod_device *lod,
                }
        }
 
-       if (!strcmp(LUSTRE_OSC_NAME, type)) {
+       if (for_ost) {
                /* pool and qos are not supported for MDS stack yet */
                rc = lod_ost_pool_add(&lod->lod_pool_info, index,
                                      lod->lod_osts_size);
@@ -381,26 +380,62 @@ int lod_add_device(const struct lu_env *env, struct lod_device *lod,
        ltd->ltd_tgtnr++;
        mutex_unlock(&ltd->ltd_mutex);
        lod_putref(lod, ltd);
+       lock = false;
        if (lod->lod_recovery_completed)
-               ldev->ld_ops->ldo_recovery_complete(env, ldev);
+               lu_dev->ld_ops->ldo_recovery_complete(env, lu_dev);
+
+       if (!for_ost && lod->lod_initialized) {
+               rc = lod_sub_init_llog(env, lod, tgt_desc->ltd_tgt);
+               if (rc != 0) {
+                       CERROR("%s: cannot start llog on %s:rc = %d\n",
+                              lod2obd(lod)->obd_name, osp, rc);
+                       GOTO(out_ltd, rc);
+               }
+       }
 
-       rc = lfsck_add_target(env, lod->lod_child, d, exp, index, for_ost);
-       if (rc != 0)
+       rc = lfsck_add_target(env, lod->lod_child, dt_dev, exp, index, for_ost);
+       if (rc != 0) {
                CERROR("Fail to add LFSCK target: name = %s, type = %s, "
                       "index = %u, rc = %d\n", osp, type, index, rc);
-
+               GOTO(out_fini_llog, rc);
+       }
        RETURN(rc);
+out_fini_llog:
+       lod_sub_fini_llog(env, tgt_desc->ltd_tgt,
+                         tgt_desc->ltd_recovery_thread);
+out_ltd:
+       lod_getref(ltd);
+       mutex_lock(&ltd->ltd_mutex);
+       lock = true;
+       if (!for_ost && LTD_TGT(ltd, index)->ltd_recovery_thread != NULL) {
+               struct ptlrpc_thread *thread;
 
+               thread = LTD_TGT(ltd, index)->ltd_recovery_thread;
+               OBD_FREE_PTR(thread);
+       }
+       ltd->ltd_tgtnr--;
+       cfs_bitmap_clear(ltd->ltd_tgt_bitmap, index);
+       LTD_TGT(ltd, index) = NULL;
 out_pool:
        lod_ost_pool_remove(&lod->lod_pool_info, index);
 out_mutex:
-       mutex_unlock(&ltd->ltd_mutex);
-       lod_putref(lod, ltd);
+       if (lock) {
+               mutex_unlock(&ltd->ltd_mutex);
+               lod_putref(lod, ltd);
+       }
 out_desc:
        OBD_FREE_PTR(tgt_desc);
 out_conn:
        obd_disconnect(exp);
-out_free:
+out_cleanup:
+       /* XXX OSP needs us to send down LCFG_CLEANUP because it uses
+        * objects from the MDT stack. See LU-7184. */
+       lcfg = &lod_env_info(env)->lti_lustre_cfg;
+       memset(lcfg, 0, sizeof(*lcfg));
+       lcfg->lcfg_version = LUSTRE_CFG_VERSION;
+       lcfg->lcfg_command = LCFG_CLEANUP;
+       lu_dev->ld_ops->ldo_process_config(env, lu_dev, lcfg);
+
        return rc;
 }
 
@@ -426,6 +461,13 @@ static void __lod_del_device(const struct lu_env *env, struct lod_device *lod,
        lfsck_del_target(env, lod->lod_child, LTD_TGT(ltd, idx)->ltd_tgt,
                         idx, for_ost);
 
+       if (!for_ost && LTD_TGT(ltd, idx)->ltd_recovery_thread != NULL) {
+               struct ptlrpc_thread *thread;
+
+               thread = LTD_TGT(ltd, idx)->ltd_recovery_thread;
+               OBD_FREE_PTR(thread);
+       }
+
        if (LTD_TGT(ltd, idx)->ltd_reap == 0) {
                LTD_TGT(ltd, idx)->ltd_reap = 1;
                ltd->ltd_death_row++;
@@ -661,7 +703,11 @@ int lod_generate_and_set_lovea(const struct lu_env *env,
 
                ostid_cpu_to_le(&info->lti_ostid, &objs[i].l_ost_oi);
                objs[i].l_ost_gen    = cpu_to_le32(0);
-               rc = lod_fld_lookup(env, lod, fid, &index, &type);
+               if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FLD_LOOKUP))
+                       rc = -ENOENT;
+               else
+                       rc = lod_fld_lookup(env, lod, fid,
+                                           &index, &type);
                if (rc < 0) {
                        CERROR("%s: Can not locate "DFID": rc = %d\n",
                               lod2obd(lod)->obd_name, PFID(fid), rc);
@@ -673,10 +719,12 @@ int lod_generate_and_set_lovea(const struct lu_env *env,
 
        info->lti_buf.lb_buf = lmm;
        info->lti_buf.lb_len = lmm_size;
-       rc = dt_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV, 0,
-                         th, BYPASS_CAPA);
-       if (rc < 0)
+       rc = lod_sub_object_xattr_set(env, next, &info->lti_buf, XATTR_NAME_LOV,
+                                     0, th);
+       if (rc < 0) {
                lod_object_free_striping(env, lo);
+               RETURN(rc);
+       }
 
        RETURN(rc);
 }
@@ -712,7 +760,7 @@ int lod_get_ea(const struct lu_env *env, struct lod_object *lo,
 repeat:
                info->lti_buf.lb_buf = info->lti_ea_store;
                info->lti_buf.lb_len = info->lti_ea_store_size;
-               rc = dt_xattr_get(env, next, &info->lti_buf, name, BYPASS_CAPA);
+               rc = dt_xattr_get(env, next, &info->lti_buf, name);
        }
 
        /* if object is not striped or inaccessible */
@@ -721,8 +769,7 @@ repeat:
 
        if (rc == -ERANGE) {
                /* EA doesn't fit, reallocate new buffer */
-               rc = dt_xattr_get(env, next, &LU_BUF_NULL, name,
-                                 BYPASS_CAPA);
+               rc = dt_xattr_get(env, next, &LU_BUF_NULL, name);
                if (rc == -ENODATA || rc == -ENOENT)
                        RETURN(0);
                else if (rc < 0)
@@ -992,6 +1039,9 @@ int lod_load_striping_locked(const struct lu_env *env, struct lod_object *lo)
                 */
                rc = lod_parse_dir_striping(env, lo, buf);
        }
+
+       if (rc == 0)
+               lo->ldo_striping_cached = 1;
 out:
        RETURN(rc);
 }
@@ -1295,6 +1345,7 @@ int lod_pools_init(struct lod_device *lod, struct lustre_cfg *lcfg)
        rc = lod_ost_pool_init(&lod->lod_pool_info, 0);
        if (rc)
                GOTO(out_hash, rc);
+       lod_qos_rr_init(&lod->lod_qos.lq_rr);
        rc = lod_ost_pool_init(&lod->lod_qos.lq_rr.lqr_pool, 0);
        if (rc)
                GOTO(out_pool_info, rc);