Whamcloud - gitweb
LU-8900 snapshot: simulate readonly device 67/24267/15
authorFan Yong <fan.yong@intel.com>
Fri, 4 Nov 2016 12:10:12 +0000 (20:10 +0800)
committerOleg Drokin <oleg.drokin@intel.com>
Thu, 23 Mar 2017 01:38:57 +0000 (01:38 +0000)
Introduce new server-side mount option: rdonly_dev. Under the
device readonly mode, the Lustre kernel threads that may cause
system modification, such as lfsck, scrub, osp_precreate_thread,
osp_sync_thread, and so on, will be disabled when mount. And the
system modification for recovery and client connect/disconnect,
will be ignored also.

On the other hand, if the server is mounted as "rdonly_dev", then
the client must specify "-o ro" option when mount; otherwise, it
will get -EACCES.

Signed-off-by: Fan Yong <fan.yong@intel.com>
Change-Id: Iafc02bf53c41b9d7f08120ba7639b91ae527bbbc
Reviewed-on: https://review.whamcloud.com/24267
Tested-by: Jenkins
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: Niu Yawei <yawei.niu@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
43 files changed:
lustre/fid/fid_handler.c
lustre/fid/fid_store.c
lustre/fld/fld_index.c
lustre/include/dt_object.h
lustre/include/lu_target.h
lustre/include/lustre_disk.h
lustre/include/lustre_net.h
lustre/ldlm/ldlm_lib.c
lustre/lfsck/lfsck_lib.c
lustre/lod/lod_dev.c
lustre/mdd/mdd_device.c
lustre/mdt/mdt_coordinator.c
lustre/mdt/mdt_handler.c
lustre/mdt/mdt_hsm_cdt_client.c
lustre/mdt/mdt_internal.h
lustre/mdt/mdt_open.c
lustre/mgc/mgc_request.c
lustre/mgs/mgs_llog.c
lustre/mgs/mgs_nids.c
lustre/obdclass/obd_mount.c
lustre/ofd/ofd_dev.c
lustre/ofd/ofd_fs.c
lustre/ofd/ofd_obd.c
lustre/osd-ldiskfs/osd_handler.c
lustre/osd-ldiskfs/osd_scrub.c
lustre/osd-zfs/osd_handler.c
lustre/osd-zfs/osd_index.c
lustre/osd-zfs/osd_object.c
lustre/osd-zfs/osd_oi.c
lustre/osp/osp_dev.c
lustre/osp/osp_precreate.c
lustre/osp/osp_sync.c
lustre/ptlrpc/nodemap_storage.c
lustre/quota/qmt_dev.c
lustre/quota/qsd_handler.c
lustre/quota/qsd_lib.c
lustre/quota/qsd_reint.c
lustre/target/out_handler.c
lustre/target/tgt_lastrcvd.c
lustre/target/update_trans.c
lustre/tests/sanity.sh
lustre/tests/test-framework.sh
lustre/utils/mount_utils_zfs.c

index c1182bd..537ea6d 100644 (file)
@@ -364,6 +364,7 @@ static int seq_server_handle(struct lu_site *site,
 {
        int rc;
        struct seq_server_site *ss_site;
+       struct dt_device *dev;
        ENTRY;
 
        ss_site = lu_site2seq(site);
@@ -375,6 +376,11 @@ static int seq_server_handle(struct lu_site *site,
                               "initialized\n");
                        RETURN(-EINVAL);
                }
+
+               dev = lu2dt_dev(ss_site->ss_server_seq->lss_obj->do_lu.lo_dev);
+               if (dev->dd_rdonly)
+                       RETURN(-EROFS);
+
                rc = seq_server_alloc_meta(ss_site->ss_server_seq, out, env);
                break;
        case SEQ_ALLOC_SUPER:
@@ -383,6 +389,11 @@ static int seq_server_handle(struct lu_site *site,
                               "initialized\n");
                        RETURN(-EINVAL);
                }
+
+               dev = lu2dt_dev(ss_site->ss_control_seq->lss_obj->do_lu.lo_dev);
+               if (dev->dd_rdonly)
+                       RETURN(-EROFS);
+
                rc = seq_server_alloc_super(ss_site->ss_control_seq, out, env);
                break;
        default:
index 8cd6c41..fd85270 100644 (file)
@@ -107,6 +107,9 @@ int seq_store_update(const struct lu_env *env, struct lu_server_seq *seq,
        loff_t pos = 0;
        int rc;
 
+       if (dt_dev->dd_rdonly)
+               RETURN(0);
+
        info = lu_context_key_get(&env->le_ctx, &seq_thread_key);
        LASSERT(info != NULL);
 
index 653effb..ffeca9b 100644 (file)
@@ -271,12 +271,16 @@ int fld_insert_entry(const struct lu_env *env,
                     const struct lu_seq_range *range)
 {
        struct thandle *th;
+       struct dt_device *dt = lu2dt_dev(fld->lsf_obj->do_lu.lo_dev);
        int rc;
        ENTRY;
 
        LASSERT(mutex_is_locked(&fld->lsf_lock));
 
-       th = dt_trans_create(env, lu2dt_dev(fld->lsf_obj->do_lu.lo_dev));
+       if (dt->dd_rdonly)
+               RETURN(0);
+
+       th = dt_trans_create(env, dt);
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
 
@@ -287,8 +291,7 @@ int fld_insert_entry(const struct lu_env *env,
                GOTO(out, rc);
        }
 
-       rc = dt_trans_start_local(env, lu2dt_dev(fld->lsf_obj->do_lu.lo_dev),
-                                 th);
+       rc = dt_trans_start_local(env, dt, th);
        if (rc)
                GOTO(out, rc);
 
@@ -296,7 +299,7 @@ int fld_insert_entry(const struct lu_env *env,
        if (rc == -EEXIST)
                rc = 0;
 out:
-       dt_trans_stop(env, lu2dt_dev(fld->lsf_obj->do_lu.lo_dev), th);
+       dt_trans_stop(env, dt, th);
        RETURN(rc);
 }
 EXPORT_SYMBOL(fld_insert_entry);
index 588d0c3..537ee16 100644 (file)
@@ -1740,7 +1740,8 @@ struct dt_device {
          * single-threaded start-up shut-down procedures.
          */
        struct list_head                   dd_txn_callbacks;
-       unsigned int                       dd_record_fid_accessed:1;
+       unsigned int                       dd_record_fid_accessed:1,
+                                          dd_rdonly:1;
 };
 
 int  dt_device_init(struct dt_device *dev, struct lu_device_type *t);
index cc6a4e3..295902b 100644 (file)
@@ -436,9 +436,6 @@ void tgt_boot_epoch_update(struct lu_target *lut);
 void tgt_save_slc_lock(struct lu_target *lut, struct ldlm_lock *lock,
                       __u64 transno);
 void tgt_discard_slc_lock(struct lu_target *lut, struct ldlm_lock *lock);
-int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *lut,
-                          struct obd_export *exp, __u64 transno);
-int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp);
 int tgt_init(const struct lu_env *env, struct lu_target *lut,
             struct obd_device *obd, struct dt_device *dt,
             struct tgt_opc_slice *slice,
@@ -449,17 +446,8 @@ void tgt_client_free(struct obd_export *exp);
 int tgt_client_del(const struct lu_env *env, struct obd_export *exp);
 int tgt_client_add(const struct lu_env *env, struct obd_export *exp, int);
 int tgt_client_new(const struct lu_env *env, struct obd_export *exp);
-int tgt_client_data_read(const struct lu_env *env, struct lu_target *tg,
-                        struct lsd_client_data *lcd, loff_t *off, int index);
-int tgt_client_data_write(const struct lu_env *env, struct lu_target *tg,
-                         struct lsd_client_data *lcd, loff_t *off, struct thandle *th);
-int tgt_server_data_read(const struct lu_env *env, struct lu_target *tg);
-int tgt_server_data_write(const struct lu_env *env, struct lu_target *tg,
-                         struct thandle *th);
 int tgt_server_data_update(const struct lu_env *env, struct lu_target *tg,
                           int sync);
-int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tg,
-                          loff_t off);
 int tgt_reply_data_init(const struct lu_env *env, struct lu_target *tgt);
 bool tgt_lookup_reply(struct ptlrpc_request *req, struct tg_reply_data *trd);
 int tgt_add_reply_data(const struct lu_env *env, struct lu_target *tgt,
index 7e86ca0..2841f6a 100644 (file)
@@ -259,6 +259,7 @@ struct lustre_mount_data {
 #define LMD_FLG_VIRGIN         0x1000  /* the service registers first time */
 #define LMD_FLG_UPDATE         0x2000  /* update parameters */
 #define LMD_FLG_HSM            0x4000  /* Start coordinator */
+#define LMD_FLG_DEV_RDONLY     0x8000  /* discard modification quitely */
 
 #define lmd_is_client(x) ((x)->lmd_flags & LMD_FLG_CLIENT)
 
index ef5e7ba..eee74a6 100644 (file)
@@ -1494,12 +1494,13 @@ struct ptlrpc_bulk_desc {
 #define BD_GET_ENC_KVEC(desc, i)       ((desc)->bd_u.bd_kvec.bd_enc_kvec[i])
 
 enum {
-        SVC_STOPPED     = 1 << 0,
-        SVC_STOPPING    = 1 << 1,
-        SVC_STARTING    = 1 << 2,
-        SVC_RUNNING     = 1 << 3,
-        SVC_EVENT       = 1 << 4,
-        SVC_SIGNAL      = 1 << 5,
+       SVC_INIT        = 0,
+       SVC_STOPPED     = 1 << 0,
+       SVC_STOPPING    = 1 << 1,
+       SVC_STARTING    = 1 << 2,
+       SVC_RUNNING     = 1 << 3,
+       SVC_EVENT       = 1 << 4,
+       SVC_SIGNAL      = 1 << 5,
 };
 
 #define PTLRPC_THR_NAME_LEN            32
index e2b83d0..65a8698 100644 (file)
@@ -2629,13 +2629,16 @@ static void target_recovery_expired(unsigned long castmeharder)
 
 void target_recovery_init(struct lu_target *lut, svc_handler_t handler)
 {
-        struct obd_device *obd = lut->lut_obd;
+       struct obd_device *obd = lut->lut_obd;
 
-        if (obd->obd_max_recoverable_clients == 0) {
-                /** Update server last boot epoch */
-                tgt_boot_epoch_update(lut);
-                return;
-        }
+       if (lut->lut_bottom->dd_rdonly)
+               return;
+
+       if (obd->obd_max_recoverable_clients == 0) {
+               /** Update server last boot epoch */
+               tgt_boot_epoch_update(lut);
+               return;
+       }
 
        CDEBUG(D_HA, "RECOVERY: service %s, %d recoverable clients, "
               "last_transno %llu\n", obd->obd_name,
index 98f7e24..6bf6e68 100644 (file)
@@ -3046,6 +3046,9 @@ int lfsck_start(const struct lu_env *env, struct dt_device *key,
        __u16                            type   = 1;
        ENTRY;
 
+       if (key->dd_rdonly)
+               RETURN(-EROFS);
+
        lfsck = lfsck_instance_find(key, true, false);
        if (unlikely(lfsck == NULL))
                RETURN(-ENXIO);
index 71be4e2..bb1a90c 100644 (file)
@@ -385,7 +385,7 @@ static int lod_sub_recovery_thread(void *arg)
 
 again:
        rc = lod_sub_prep_llog(&env, lod, dt, lrd->lrd_idx);
-       if (rc == 0) {
+       if (!rc && !lod->lod_child->dd_rdonly) {
                /* Process the recovery record */
                ctxt = llog_get_context(dt->dd_lu_dev.ld_obd,
                                        LLOG_UPDATELOG_ORIG_CTXT);
index 7192e6c..ca63154 100644 (file)
@@ -881,7 +881,11 @@ static void mdd_device_shutdown(const struct lu_env *env, struct mdd_device *m,
        mdd_changelog_fini(env, m);
        orph_index_fini(env, m);
        mdd_dot_lustre_cleanup(env, m);
-       nm_config_file_deregister_tgt(env, mdd2obd_dev(m)->u.obt.obt_nodemap_config_file);
+       if (mdd2obd_dev(m)->u.obt.obt_nodemap_config_file) {
+               nm_config_file_deregister_tgt(env,
+                               mdd2obd_dev(m)->u.obt.obt_nodemap_config_file);
+               mdd2obd_dev(m)->u.obt.obt_nodemap_config_file = NULL;
+       }
        if (m->mdd_los != NULL) {
                local_oid_storage_fini(env, m->mdd_los);
                m->mdd_los = NULL;
@@ -946,7 +950,8 @@ static int mdd_recovery_complete(const struct lu_env *env,
        next = &mdd->mdd_child->dd_lu_dev;
 
         /* XXX: orphans handling. */
-        mdd_orphan_cleanup(env, mdd);
+       if (!mdd->mdd_bottom->dd_rdonly)
+               mdd_orphan_cleanup(env, mdd);
         rc = next->ld_ops->ldo_recovery_complete(env, next);
 
         RETURN(rc);
@@ -999,6 +1004,7 @@ static int mdd_prepare(const struct lu_env *env,
        struct mdd_device *mdd = lu2mdd_dev(cdev);
        struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
        struct nm_config_file *nodemap_config;
+       struct obd_device_target *obt = &mdd2obd_dev(mdd)->u.obt;
        struct lu_fid fid;
        int rc;
 
@@ -1062,10 +1068,13 @@ static int mdd_prepare(const struct lu_env *env,
 
        nodemap_config = nm_config_file_register_tgt(env, mdd->mdd_bottom,
                                                     mdd->mdd_los);
-       if (IS_ERR(nodemap_config))
-               GOTO(out_hsm, rc = PTR_ERR(nodemap_config));
-
-       mdd2obd_dev(mdd)->u.obt.obt_nodemap_config_file = nodemap_config;
+       if (IS_ERR(nodemap_config)) {
+               rc = PTR_ERR(nodemap_config);
+               if (rc != -EROFS)
+                       GOTO(out_hsm, rc);
+       } else {
+               obt->obt_nodemap_config_file = nodemap_config;
+       }
 
        rc = lfsck_register(env, mdd->mdd_bottom, mdd->mdd_child,
                            mdd2obd_dev(mdd), mdd_lfsck_out_notify,
@@ -1088,8 +1097,8 @@ static int mdd_prepare(const struct lu_env *env,
 out_lfsck:
        lfsck_degister(env, mdd->mdd_bottom);
 out_nodemap:
-       nm_config_file_deregister_tgt(env, mdd2obd_dev(mdd)->u.obt.obt_nodemap_config_file);
-       mdd2obd_dev(mdd)->u.obt.obt_nodemap_config_file = NULL;
+       nm_config_file_deregister_tgt(env, obt->obt_nodemap_config_file);
+       obt->obt_nodemap_config_file = NULL;
 out_hsm:
        mdd_hsm_actions_llog_fini(env, mdd);
 out_changelog:
index 851e1f3..a981809 100644 (file)
@@ -882,6 +882,9 @@ static int mdt_hsm_cdt_start(struct mdt_device *mdt)
                       " for registered restore: %d\n",
                       mdt_obd_name(mdt), rc);
 
+       if (mdt->mdt_bottom->dd_rdonly)
+               RETURN(0);
+
        task = kthread_run(mdt_coordinator, cdt_mti, "hsm_cdtr");
        if (IS_ERR(task)) {
                rc = PTR_ERR(task);
@@ -916,6 +919,9 @@ int mdt_hsm_cdt_stop(struct mdt_device *mdt)
        struct mdt_thread_info          *cdt_mti;
        ENTRY;
 
+       if (mdt->mdt_opts.mo_coordinator == 0)
+               RETURN(0);
+
        if (cdt->cdt_state == CDT_STOPPED) {
                CERROR("%s: Coordinator already stopped\n",
                       mdt_obd_name(mdt));
index 4d1fbaf..c732096 100644 (file)
@@ -3641,6 +3641,9 @@ static int mdt_intent_opc(enum ldlm_intent_flags itopc,
                if (qmt == NULL)
                        RETURN(-EOPNOTSUPP);
 
+               if (mdt_rdonly(req->rq_export))
+                       RETURN(-EROFS);
+
                (*lockp)->l_lvb_type = LVB_T_LQUOTA;
                /* pass the request to quota master */
                rc = qmt_hdls.qmth_intent_policy(info->mti_env, qmt,
@@ -3657,8 +3660,7 @@ static int mdt_intent_opc(enum ldlm_intent_flags itopc,
        if (rc < 0)
                RETURN(rc);
 
-       if (flv->it_flags & MUTABOR &&
-           exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY)
+       if (flv->it_flags & MUTABOR && mdt_rdonly(req->rq_export))
                RETURN(-EROFS);
 
        if (flv->it_act != NULL) {
@@ -5110,6 +5112,11 @@ static int mdt_connect_internal(struct obd_export *exp,
 
        data->ocd_connect_flags &= MDT_CONNECT_SUPPORTED;
 
+       if (mdt->mdt_bottom->dd_rdonly &&
+           !(data->ocd_connect_flags & OBD_CONNECT_MDS_MDS) &&
+           !(data->ocd_connect_flags & OBD_CONNECT_RDONLY))
+               RETURN(-EACCES);
+
        if (data->ocd_connect_flags & OBD_CONNECT_FLAGS2)
                data->ocd_connect_flags2 &= MDT_CONNECT_SUPPORTED2;
 
@@ -6027,7 +6034,7 @@ static int mdt_postrecov(const struct lu_env *env, struct mdt_device *mdt)
        int rc;
        ENTRY;
 
-       if (!mdt->mdt_skip_lfsck) {
+       if (!mdt->mdt_skip_lfsck && !mdt->mdt_bottom->dd_rdonly) {
                struct lfsck_start_param lsp;
 
                lsp.lsp_start = NULL;
index 23ab4db..22df94c 100644 (file)
@@ -257,8 +257,7 @@ hsm_action_permission(struct mdt_thread_info *mti,
        int rc;
        ENTRY;
 
-       if (hsma != HSMA_RESTORE &&
-           exp_connect_flags(mti->mti_exp) & OBD_CONNECT_RDONLY)
+       if (hsma != HSMA_RESTORE && mdt_rdonly(mti->mti_exp))
                RETURN(-EROFS);
 
        if (md_capable(uc, CFS_CAP_SYS_ADMIN))
index 7b90b58..98ce152 100644 (file)
@@ -908,6 +908,14 @@ static inline struct mdt_device *mdt_exp2dev(struct obd_export *exp)
        return mdt_dev(exp->exp_obd->obd_lu_dev);
 }
 
+static inline bool mdt_rdonly(struct obd_export *exp)
+{
+       if (exp_connect_flags(exp) & OBD_CONNECT_RDONLY ||
+           mdt_exp2dev(exp)->mdt_bottom->dd_rdonly)
+               return true;
+       return false;
+}
+
 typedef void (*mdt_reconstruct_t)(struct mdt_thread_info *mti,
                                   struct mdt_lock_handle *lhc);
 static inline int mdt_check_resent(struct mdt_thread_info *info,
index a6a1846..773a8be 100644 (file)
@@ -196,8 +196,11 @@ static void mdt_empty_transno(struct mdt_thread_info *info, int rc)
         struct ptlrpc_request  *req = mdt_info_req(info);
         struct tg_export_data  *ted;
         struct lsd_client_data *lcd;
-
         ENTRY;
+
+       if (mdt_rdonly(req->rq_export))
+               RETURN_EXIT;
+
         /* transaction has occurred already */
         if (lustre_msg_get_transno(req->rq_repmsg) != 0)
                 RETURN_EXIT;
@@ -1403,7 +1406,7 @@ again:
 
                 if (!(create_flags & MDS_OPEN_CREAT))
                         GOTO(out_parent, result);
-               if (exp_connect_flags(req->rq_export) & OBD_CONNECT_RDONLY)
+               if (mdt_rdonly(req->rq_export))
                        GOTO(out_parent, result = -EROFS);
                 *child_fid = *info->mti_rr.rr_fid2;
                 LASSERTF(fid_is_sane(child_fid), "fid="DFID"\n",
@@ -1678,7 +1681,7 @@ static int mdt_hsm_release(struct mdt_thread_info *info, struct mdt_object *o,
        int                     rc2;
        ENTRY;
 
-       if (exp_connect_flags(info->mti_exp) & OBD_CONNECT_RDONLY)
+       if (mdt_rdonly(info->mti_exp))
                RETURN(-EROFS);
 
        data = req_capsule_client_get(info->mti_pill, &RMF_CLOSE_DATA);
index 12a8751..119c0df 100644 (file)
@@ -1930,7 +1930,7 @@ static int mgc_process_cfg_log(struct obd_device *mgc,
            cli->cl_mgc_configs_dir != NULL &&
            lu2dt_dev(cli->cl_mgc_configs_dir->do_lu.lo_dev) ==
            lsi->lsi_dt_dev) {
-               if (!local_only)
+               if (!local_only && !lsi->lsi_dt_dev->dd_rdonly)
                        /* Only try to copy log if we have the lock. */
                        rc = mgc_llog_local_copy(env, mgc, ctxt, lctxt,
                                                 cld->cld_logname);
@@ -1958,11 +1958,24 @@ static int mgc_process_cfg_log(struct obd_device *mgc,
                        GOTO(out_pop, rc = -EIO);
        }
 
-       /* logname and instance info should be the same, so use our
-        * copy of the instance for the update.  The cfg_last_idx will
-        * be updated here. */
-       rc = class_config_parse_llog(env, ctxt, cld->cld_logname,
-                                    &cld->cld_cfg);
+       rc = -EAGAIN;
+       if (lsi && IS_SERVER(lsi) && !IS_MGS(lsi) &&
+           lsi->lsi_dt_dev->dd_rdonly) {
+               struct llog_ctxt *rctxt;
+
+               /* Under readonly mode, we may have no local copy or local
+                * copy is incomplete, so try to use remote llog firstly. */
+               rctxt = llog_get_context(mgc, LLOG_CONFIG_REPL_CTXT);
+               LASSERT(rctxt);
+
+               rc = class_config_parse_llog(env, rctxt, cld->cld_logname,
+                                            &cld->cld_cfg);
+               llog_ctxt_put(rctxt);
+       }
+
+       if (rc && rc != -ENOENT)
+               rc = class_config_parse_llog(env, ctxt, cld->cld_logname,
+                                            &cld->cld_cfg);
 
        /*
         * update settings on existing OBDs. doing it inside
index 51a9e0b..15102e0 100644 (file)
@@ -460,7 +460,8 @@ static struct fs_db *mgs_new_fsdb(const struct lu_env *env,
                lproc_mgs_add_live(mgs, fsdb);
        }
 
-       if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags)) {
+       if (!test_bit(FSDB_MGS_SELF, &fsdb->fsdb_flags) &&
+           strcmp(PARAMS_FILENAME, fsname) != 0) {
                /* populate the db from the client llog */
                rc = mgs_get_fsdb_from_llog(env, mgs, fsdb);
                if (rc) {
index 2d14d2a..3376fb2 100644 (file)
@@ -220,6 +220,9 @@ static int nidtbl_update_version(const struct lu_env *env,
        int               rc;
         ENTRY;
 
+       if (mgs->mgs_bottom->dd_rdonly)
+               RETURN(0);
+
        LASSERT(mutex_is_locked(&tbl->mn_lock));
 
        fsdb = local_file_find_or_create(env, mgs->mgs_los, mgs->mgs_nidtbl_dir,
index ca30269..a0c6bbd 100644 (file)
@@ -1233,6 +1233,9 @@ static int lmd_parse(char *options, struct lustre_mount_data *lmd)
                } else if (strncmp(s1, "skip_lfsck", 10) == 0) {
                        lmd->lmd_flags |= LMD_FLG_SKIP_LFSCK;
                        clear++;
+               } else if (strncmp(s1, "rdonly_dev", 10) == 0) {
+                       lmd->lmd_flags |= LMD_FLG_DEV_RDONLY;
+                       clear++;
                } else if (strncmp(s1, PARAM_MGSNODE,
                                   sizeof(PARAM_MGSNODE) - 1) == 0) {
                        s2 = s1 + sizeof(PARAM_MGSNODE) - 1;
index 716e602..5662ca7 100644 (file)
@@ -2894,6 +2894,7 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
        struct obd_statfs *osfs;
        struct lu_fid fid;
        struct nm_config_file *nodemap_config;
+       struct obd_device_target *obt;
        int rc;
 
        ENTRY;
@@ -2908,7 +2909,8 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
        if (rc != 0)
                RETURN(rc);
 
-       obd->u.obt.obt_magic = OBT_MAGIC;
+       obt = &obd->u.obt;
+       obt->obt_magic = OBT_MAGIC;
 
        m->ofd_fmd_max_num = OFD_FMD_MAX_NUM_DEFAULT;
        m->ofd_fmd_max_age = OFD_FMD_MAX_AGE_DEFAULT;
@@ -3040,10 +3042,13 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
 
        nodemap_config = nm_config_file_register_tgt(env, m->ofd_osd,
                                                     m->ofd_los);
-       if (IS_ERR(nodemap_config))
-               GOTO(err_fini_los, rc = PTR_ERR(nodemap_config));
-
-       obd->u.obt.obt_nodemap_config_file = nodemap_config;
+       if (IS_ERR(nodemap_config)) {
+               rc = PTR_ERR(nodemap_config);
+               if (rc != -EROFS)
+                       GOTO(err_fini_los, rc);
+       } else {
+               obt->obt_nodemap_config_file = nodemap_config;
+       }
 
        rc = ofd_start_inconsistency_verification_thread(m);
        if (rc != 0)
@@ -3054,8 +3059,8 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
        RETURN(0);
 
 err_fini_nm:
-       nm_config_file_deregister_tgt(env, obd->u.obt.obt_nodemap_config_file);
-       obd->u.obt.obt_nodemap_config_file = NULL;
+       nm_config_file_deregister_tgt(env, obt->obt_nodemap_config_file);
+       obt->obt_nodemap_config_file = NULL;
 err_fini_los:
        local_oid_storage_fini(env, m->ofd_los);
        m->ofd_los = NULL;
index d8f421c..ea3d87b 100644 (file)
@@ -212,6 +212,9 @@ int ofd_seq_last_oid_write(const struct lu_env *env, struct ofd_device *ofd,
 
        ENTRY;
 
+       if (ofd->ofd_osd->dd_rdonly)
+               RETURN(0);
+
        tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
 
        info->fti_buf.lb_buf = &tmp;
index 4b6034b..35173aa 100644 (file)
@@ -554,7 +554,7 @@ int ofd_postrecov(const struct lu_env *env, struct ofd_device *ofd)
 
        CDEBUG(D_HA, "%s: recovery is over\n", ofd_name(ofd));
 
-       if (!ofd->ofd_skip_lfsck) {
+       if (!ofd->ofd_skip_lfsck && !ofd->ofd_osd->dd_rdonly) {
                struct lfsck_start_param lsp;
 
                lsp.lsp_start = NULL;
index 28f5e37..c9a9ff5 100644 (file)
@@ -1604,6 +1604,14 @@ static struct thandle *osd_trans_create(const struct lu_env *env,
        struct thandle          *th;
        ENTRY;
 
+       if (d->dd_rdonly) {
+               CERROR("%s: someone try to start transaction under "
+                      "readonly mode, should be disabled.\n",
+                      osd_name(osd_dt_dev(d)));
+               dump_stack();
+               RETURN(ERR_PTR(-EROFS));
+       }
+
        /* on pending IO in this thread should left from prev. request */
        LASSERT(atomic_read(&iobuf->dr_numreqs) == 0);
 
@@ -6868,16 +6876,30 @@ static int osd_mount(const struct lu_env *env,
                       "greater than 512TB and can cause data corruption. "
                       "Use \"force_over_512tb\" mount option to override.\n",
                       name, dev);
-               GOTO(out, rc = -EINVAL);
+               GOTO(out_mnt, rc = -EINVAL);
        }
 
+       if (lmd_flags & LMD_FLG_DEV_RDONLY) {
 #ifdef HAVE_DEV_SET_RDONLY
-       if (dev_check_rdonly(o->od_mnt->mnt_sb->s_bdev)) {
-               CERROR("%s: underlying device %s is marked as read-only. "
-                      "Setup failed\n", name, dev);
-               GOTO(out_mnt, rc = -EROFS);
-       }
+               dev_set_rdonly(osd_sb(o)->s_bdev);
+               o->od_dt_dev.dd_rdonly = 1;
+               LCONSOLE_WARN("%s: set dev_rdonly on this device\n", name);
+#else
+               LCONSOLE_WARN("%s: not support dev_rdonly on this device",
+                             name);
+
+               GOTO(out_mnt, rc = -EOPNOTSUPP);
 #endif
+       } else {
+#ifdef HAVE_DEV_SET_RDONLY
+               if (dev_check_rdonly(osd_sb(o)->s_bdev)) {
+                       CERROR("%s: underlying device %s is marked as "
+                              "read-only. Setup failed\n", name, dev);
+
+                       GOTO(out_mnt, rc = -EROFS);
+               }
+#endif
+       }
 
        if (!LDISKFS_HAS_COMPAT_FEATURE(o->od_mnt->mnt_sb,
                                        LDISKFS_FEATURE_COMPAT_HAS_JOURNAL)) {
index 0302222..51f2bb0 100644 (file)
@@ -2572,6 +2572,9 @@ static int do_osd_scrub_start(struct osd_device *dev, __u32 flags)
        int                   rc;
        ENTRY;
 
+       if (dev->od_dt_dev.dd_rdonly)
+               RETURN(-EROFS);
+
        /* os_lock: sync status between stop and scrub thread */
        spin_lock(&scrub->os_lock);
 
@@ -2819,7 +2822,7 @@ int osd_scrub_setup(const struct lu_env *env, struct osd_device *dev)
                 * later if found that the system is upgrading. */
                dev->od_igif_inoi = 1;
 
-       if (!dev->od_noscrub &&
+       if (!dev->od_dt_dev.dd_rdonly && !dev->od_noscrub &&
            ((sf->sf_status == SS_PAUSED) ||
             (sf->sf_status == SS_CRASHED &&
              sf->sf_flags & (SF_RECREATED | SF_INCONSISTENT |
index 5307f92..f57e3a7 100644 (file)
@@ -337,6 +337,14 @@ static struct thandle *osd_trans_create(const struct lu_env *env,
        dmu_tx_t                *tx;
        ENTRY;
 
+       if (dt->dd_rdonly) {
+               CERROR("%s: someone try to start transaction under "
+                      "readonly mode, should be disabled.\n",
+                      osd_name(osd_dt_dev(dt)));
+               dump_stack();
+               RETURN(ERR_PTR(-EROFS));
+       }
+
        tx = dmu_tx_create(osd->od_os);
        if (tx == NULL)
                RETURN(ERR_PTR(-ENOMEM));
@@ -481,7 +489,8 @@ static int osd_objset_statfs(struct osd_device *osd, struct obd_statfs *osfs)
        osfs->os_bavail = osfs->os_bfree; /* no extra root reservation */
 
        /* Take replication (i.e. number of copies) into account */
-       osfs->os_bavail /= os->os_copies;
+       if (os->os_copies != 0)
+               osfs->os_bavail /= os->os_copies;
 
        /*
         * Reserve some space so we don't run into ENOSPC due to grants not
@@ -615,10 +624,14 @@ static void osd_conf_get(const struct lu_env *env,
  */
 static int osd_sync(const struct lu_env *env, struct dt_device *d)
 {
-       struct osd_device  *osd = osd_dt_dev(d);
-       CDEBUG(D_CACHE, "syncing OSD %s\n", LUSTRE_OSD_ZFS_NAME);
-       txg_wait_synced(dmu_objset_pool(osd->od_os), 0ULL);
-       CDEBUG(D_CACHE, "synced OSD %s\n", LUSTRE_OSD_ZFS_NAME);
+       if (!d->dd_rdonly) {
+               struct osd_device  *osd = osd_dt_dev(d);
+
+               CDEBUG(D_CACHE, "syncing OSD %s\n", LUSTRE_OSD_ZFS_NAME);
+               txg_wait_synced(dmu_objset_pool(osd->od_os), 0ULL);
+               CDEBUG(D_CACHE, "synced OSD %s\n", LUSTRE_OSD_ZFS_NAME);
+       }
+
        return 0;
 }
 
@@ -839,11 +852,14 @@ static int osd_objset_open(struct osd_device *o)
        int             rc;
        ENTRY;
 
-       rc = -dmu_objset_own(o->od_mntdev, DMU_OST_ZFS, B_FALSE, o, &o->od_os);
+       rc = -dmu_objset_own(o->od_mntdev, DMU_OST_ZFS,
+                            o->od_dt_dev.dd_rdonly ? B_TRUE : B_FALSE,
+                            o, &o->od_os);
        if (rc) {
                CERROR("%s: can't open %s\n", o->od_svname, o->od_mntdev);
                o->od_os = NULL;
-               goto out;
+
+               GOTO(out, rc);
        }
 
        /* Check ZFS version */
@@ -908,6 +924,14 @@ osd_unlinked_object_free(struct osd_device *osd, uint64_t oid)
        int       rc;
        dmu_tx_t *tx;
 
+       if (osd->od_dt_dev.dd_rdonly) {
+               CERROR("%s: someone try to free objects under "
+                      "readonly mode, should be disabled.\n", osd_name(osd));
+               dump_stack();
+
+               return -EROFS;
+       }
+
        rc = -dmu_free_long_range(osd->od_os, oid, 0, DMU_OBJECT_END);
        if (rc != 0) {
                CWARN("%s: Cannot truncate %llu: rc = %d\n",
@@ -973,6 +997,7 @@ static int osd_mount(const struct lu_env *env,
                     struct osd_device *o, struct lustre_cfg *cfg)
 {
        char                    *mntdev = lustre_cfg_string(cfg, 1);
+       char                    *str    = lustre_cfg_string(cfg, 2);
        char                    *svname = lustre_cfg_string(cfg, 4);
        dnode_t *rootdn;
        const char              *opts;
@@ -993,6 +1018,21 @@ static int osd_mount(const struct lu_env *env,
        if (rc >= sizeof(o->od_svname))
                RETURN(-E2BIG);
 
+       str = strstr(str, ":");
+       if (str) {
+               unsigned long flags;
+
+               rc = kstrtoul(str + 1, 10, &flags);
+               if (rc)
+                       RETURN(-EINVAL);
+
+               if (flags & LMD_FLG_DEV_RDONLY) {
+                       o->od_dt_dev.dd_rdonly = 1;
+                       LCONSOLE_WARN("%s: set dev_rdonly on this device\n",
+                                     svname);
+               }
+       }
+
        if (server_name_is_ost(o->od_svname))
                o->od_is_ost = 1;
 
@@ -1052,7 +1092,7 @@ static int osd_mount(const struct lu_env *env,
 
        osd_unlinked_drain(env, o);
 err:
-       if (rc) {
+       if (rc && o->od_os) {
                dmu_objset_disown(o->od_os, o);
                o->od_os = NULL;
        }
@@ -1075,8 +1115,9 @@ static void osd_umount(const struct lu_env *env, struct osd_device *o)
                       atomic_read(&o->od_zerocopy_pin));
 
        if (o->od_os != NULL) {
-               /* force a txg sync to get all commit callbacks */
-               txg_wait_synced(dmu_objset_pool(o->od_os), 0ULL);
+               if (!o->od_dt_dev.dd_rdonly)
+                       /* force a txg sync to get all commit callbacks */
+                       txg_wait_synced(dmu_objset_pool(o->od_os), 0ULL);
 
                /* close the object set */
                dmu_objset_disown(o->od_os, o);
@@ -1177,8 +1218,11 @@ static struct lu_device *osd_device_fini(const struct lu_env *env,
 
        if (o->od_os) {
                osd_objset_unregister_callbacks(o);
-               osd_sync(env, lu2dt_dev(d));
-               txg_wait_callbacks(spa_get_dsl(dmu_objset_spa(o->od_os)));
+               if (!o->od_dt_dev.dd_rdonly) {
+                       osd_sync(env, lu2dt_dev(d));
+                       txg_wait_callbacks(
+                                       spa_get_dsl(dmu_objset_spa(o->od_os)));
+               }
        }
 
        rc = osd_procfs_fini(o);
index e7122e1..203a3fd 100644 (file)
@@ -1401,7 +1401,8 @@ static struct dt_it *osd_zfs_otable_it_init(const struct lu_env *env,
        /* XXX: dmu_object_next() does NOT find dnodes allocated
         *      in the current non-committed txg, so we force txg
         *      commit to find all existing dnodes ... */
-       txg_wait_synced(dmu_objset_pool(dev->od_os), 0ULL);
+       if (!dev->od_dt_dev.dd_rdonly)
+               txg_wait_synced(dmu_objset_pool(dev->od_os), 0ULL);
 
        RETURN((struct dt_it *)it);
 }
index 473211a..2e64f18 100644 (file)
@@ -1729,7 +1729,8 @@ static int osd_object_sync(const struct lu_env *env, struct dt_object *dt,
         * support ZIL.  If the object tracked the txg that it was last
         * modified in, it could pass that txg here instead of "0".  Maybe
         * the changes are already committed, so no wait is needed at all? */
-       txg_wait_synced(dmu_objset_pool(osd->od_os), 0ULL);
+       if (!osd->od_dt_dev.dd_rdonly)
+               txg_wait_synced(dmu_objset_pool(osd->od_os), 0ULL);
 
        RETURN(0);
 }
index 08aa27f..0f0526b 100644 (file)
@@ -148,6 +148,9 @@ osd_oi_create(const struct lu_env *env, struct osd_device *o,
        if (rc == 0)
                return -EEXIST;
 
+       if (o->od_dt_dev.dd_rdonly)
+               return -EROFS;
+
        /* create fid-to-dnode index */
        tx = dmu_tx_create(o->od_os);
        if (tx == NULL)
index 138bb43..bb8b4b1 100644 (file)
@@ -211,6 +211,9 @@ static int osp_write_local_file(const struct lu_env *env,
        struct thandle *th;
        int rc;
 
+       if (osp->opd_storage->dd_rdonly)
+               RETURN(0);
+
        th = dt_trans_create(env, osp->opd_storage);
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
@@ -500,6 +503,9 @@ static int osp_update_init(struct osp_device *osp)
 
        LASSERT(osp->opd_connect_mdt);
 
+       if (osp->opd_storage->dd_rdonly)
+               RETURN(0);
+
        OBD_ALLOC_PTR(osp->opd_update);
        if (osp->opd_update == NULL)
                RETURN(-ENOMEM);
@@ -1163,10 +1169,11 @@ static int osp_init0(const struct lu_env *env, struct osp_device *osp,
        if (!osp->opd_connect_mdt) {
                /* Initialize last id from the storage - will be
                 * used in orphan cleanup. */
-               rc = osp_last_used_init(env, osp);
-               if (rc)
-                       GOTO(out_fid, rc);
-
+               if (!osp->opd_storage->dd_rdonly) {
+                       rc = osp_last_used_init(env, osp);
+                       if (rc)
+                               GOTO(out_fid, rc);
+               }
 
                /* Initialize precreation thread, it handles new
                 * connections as well. */
index ae7e4ef..c85b837 100644 (file)
@@ -354,6 +354,9 @@ int osp_write_last_oid_seq_files(struct lu_env *env, struct osp_device *osp,
        int                   rc;
        ENTRY;
 
+       if (osp->opd_storage->dd_rdonly)
+               RETURN(0);
+
        /* Note: through f_oid is only 32 bits, it will also write 64 bits
         * for oid to keep compatibility with the previous version. */
        lb_oid->lb_buf = &fid->f_oid;
@@ -1150,6 +1153,12 @@ static int osp_precreate_thread(void *_arg)
        if (rc) {
                CERROR("%s: init env error: rc = %d\n", d->opd_obd->obd_name,
                       rc);
+
+               spin_lock(&d->opd_pre_lock);
+               thread->t_flags = SVC_STOPPED;
+               spin_unlock(&d->opd_pre_lock);
+               wake_up(&thread->t_ctl_waitq);
+
                RETURN(rc);
        }
 
@@ -1660,6 +1669,7 @@ int osp_init_precreate(struct osp_device *d)
        spin_lock_init(&d->opd_pre_lock);
        init_waitqueue_head(&d->opd_pre_waitq);
        init_waitqueue_head(&d->opd_pre_user_waitq);
+       thread_set_flags(&d->opd_pre_thread, SVC_INIT);
        init_waitqueue_head(&d->opd_pre_thread.t_ctl_waitq);
 
        /*
@@ -1673,6 +1683,9 @@ int osp_init_precreate(struct osp_device *d)
        setup_timer(&d->opd_statfs_timer, osp_statfs_timer_cb,
                    (unsigned long)d);
 
+       if (d->opd_storage->dd_rdonly)
+               RETURN(0);
+
        /*
         * start thread handling precreation and statfs updates
         */
@@ -1701,8 +1714,7 @@ int osp_init_precreate(struct osp_device *d)
  */
 void osp_precreate_fini(struct osp_device *d)
 {
-       struct ptlrpc_thread *thread;
-
+       struct ptlrpc_thread *thread = &d->opd_pre_thread;
        ENTRY;
 
        del_timer(&d->opd_statfs_timer);
@@ -1710,12 +1722,11 @@ void osp_precreate_fini(struct osp_device *d)
        if (d->opd_pre == NULL)
                RETURN_EXIT;
 
-       thread = &d->opd_pre_thread;
-
-       thread->t_flags = SVC_STOPPING;
-       wake_up(&d->opd_pre_waitq);
-
-       wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
+       if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
+               thread->t_flags = SVC_STOPPING;
+               wake_up(&d->opd_pre_waitq);
+               wait_event(thread->t_ctl_waitq, thread_is_stopped(thread));
+       }
 
        OBD_FREE_PTR(d->opd_pre);
        d->opd_pre = NULL;
index ffe2db1..2c3d999 100644 (file)
@@ -1169,6 +1169,12 @@ static int osp_sync_thread(void *_arg)
        if (rc) {
                CERROR("%s: can't initialize env: rc = %d\n",
                       obd->obd_name, rc);
+
+               spin_lock(&d->opd_syn_lock);
+               thread->t_flags = SVC_STOPPED;
+               spin_unlock(&d->opd_syn_lock);
+               wake_up(&thread->t_ctl_waitq);
+
                RETURN(rc);
        }
 
@@ -1383,9 +1389,10 @@ static void osp_sync_llog_fini(const struct lu_env *env, struct osp_device *d)
        struct llog_ctxt *ctxt;
 
        ctxt = llog_get_context(d->opd_obd, LLOG_MDS_OST_ORIG_CTXT);
-       if (ctxt != NULL)
+       if (ctxt) {
                llog_cat_close(env, ctxt->loc_handle);
-       llog_cleanup(env, ctxt);
+               llog_cleanup(env, ctxt);
+       }
 }
 
 /**
@@ -1408,6 +1415,19 @@ int osp_sync_init(const struct lu_env *env, struct osp_device *d)
 
        ENTRY;
 
+       d->opd_syn_max_rpc_in_flight = OSP_MAX_IN_FLIGHT;
+       d->opd_syn_max_rpc_in_progress = OSP_MAX_IN_PROGRESS;
+       spin_lock_init(&d->opd_syn_lock);
+       init_waitqueue_head(&d->opd_syn_waitq);
+       init_waitqueue_head(&d->opd_syn_barrier_waitq);
+       thread_set_flags(&d->opd_syn_thread, SVC_INIT);
+       init_waitqueue_head(&d->opd_syn_thread.t_ctl_waitq);
+       INIT_LIST_HEAD(&d->opd_syn_inflight_list);
+       INIT_LIST_HEAD(&d->opd_syn_committed_there);
+
+       if (d->opd_storage->dd_rdonly)
+               RETURN(0);
+
        rc = osp_sync_id_traction_init(d);
        if (rc)
                RETURN(rc);
@@ -1425,15 +1445,6 @@ int osp_sync_init(const struct lu_env *env, struct osp_device *d)
        /*
         * Start synchronization thread
         */
-       d->opd_syn_max_rpc_in_flight = OSP_MAX_IN_FLIGHT;
-       d->opd_syn_max_rpc_in_progress = OSP_MAX_IN_PROGRESS;
-       spin_lock_init(&d->opd_syn_lock);
-       init_waitqueue_head(&d->opd_syn_waitq);
-       init_waitqueue_head(&d->opd_syn_barrier_waitq);
-       init_waitqueue_head(&d->opd_syn_thread.t_ctl_waitq);
-       INIT_LIST_HEAD(&d->opd_syn_inflight_list);
-       INIT_LIST_HEAD(&d->opd_syn_committed_there);
-
        task = kthread_run(osp_sync_thread, d, "osp-syn-%u-%u",
                           d->opd_index, d->opd_group);
        if (IS_ERR(task)) {
@@ -1469,9 +1480,11 @@ int osp_sync_fini(struct osp_device *d)
 
        ENTRY;
 
-       thread->t_flags = SVC_STOPPING;
-       wake_up(&d->opd_syn_waitq);
-       wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
+       if (!thread_is_init(thread) && !thread_is_stopped(thread)) {
+               thread->t_flags = SVC_STOPPING;
+               wake_up(&d->opd_syn_waitq);
+               wait_event(thread->t_ctl_waitq, thread_is_stopped(thread));
+       }
 
        /*
         * unregister transaction callbacks only when sync thread
index a21764a..4855df7 100644 (file)
@@ -162,22 +162,32 @@ static struct dt_object *nodemap_cache_find_create(const struct lu_env *env,
                                                   struct local_oid_storage *los,
                                                   enum ncfc_find_create create_new)
 {
-       struct lu_fid root_fid;
+       struct lu_fid tfid;
        struct dt_object *root_obj;
        struct dt_object *nm_obj;
        int rc = 0;
 
-       rc = dt_root_get(env, dev, &root_fid);
+       rc = dt_root_get(env, dev, &tfid);
        if (rc < 0)
                GOTO(out, nm_obj = ERR_PTR(rc));
 
-       root_obj = dt_locate(env, dev, &root_fid);
+       root_obj = dt_locate(env, dev, &tfid);
        if (unlikely(IS_ERR(root_obj)))
                GOTO(out, nm_obj = root_obj);
 
+       rc = dt_lookup_dir(env, root_obj, LUSTRE_NODEMAP_NAME, &tfid);
+       if (rc == -ENOENT) {
+               if (dev->dd_rdonly)
+                       GOTO(out_root, nm_obj = ERR_PTR(-EROFS));
+       } else if (rc) {
+               GOTO(out_root, nm_obj = ERR_PTR(rc));
+       } else if (dev->dd_rdonly && create_new == NCFC_CREATE_NEW) {
+               GOTO(out_root, nm_obj = ERR_PTR(-EROFS));
+       }
+
 again:
        /* if loading index fails the first time, create new index */
-       if (create_new == NCFC_CREATE_NEW) {
+       if (create_new == NCFC_CREATE_NEW && rc != -ENOENT) {
                CDEBUG(D_INFO, "removing old index, creating new one\n");
                rc = local_object_unlink(env, dev, root_obj,
                                         LUSTRE_NODEMAP_NAME);
@@ -954,7 +964,7 @@ struct dt_object *nodemap_save_config_cache(const struct lu_env *env,
        /* create a new index file to fill with active config */
        o = nodemap_cache_find_create(env, dev, los, NCFC_CREATE_NEW);
        if (IS_ERR(o))
-               GOTO(out, o);
+               RETURN(o);
 
        mutex_lock(&active_config_lock);
 
@@ -1019,7 +1029,6 @@ struct dt_object *nodemap_save_config_cache(const struct lu_env *env,
        if (rc2 < 0)
                rc = rc2;
 
-out:
        mutex_unlock(&active_config_lock);
 
        if (rc < 0) {
index 7c41c1a..ae4c97e 100644 (file)
@@ -244,11 +244,13 @@ static int qmt_device_init0(const struct lu_env *env, struct qmt_device *qmt,
        init_waitqueue_head(&qmt->qmt_reba_thread.t_ctl_waitq);
        INIT_LIST_HEAD(&qmt->qmt_reba_list);
        spin_lock_init(&qmt->qmt_reba_lock);
-       rc = qmt_start_reba_thread(qmt);
-       if (rc) {
-               CERROR("%s: failed to start rebalance thread (%d)\n",
-                      qmt->qmt_svname, rc);
-               GOTO(out, rc);
+       if (!qmt->qmt_child->dd_rdonly) {
+               rc = qmt_start_reba_thread(qmt);
+               if (rc) {
+                       CERROR("%s: failed to start rebalance thread (%d)\n",
+                              qmt->qmt_svname, rc);
+                       GOTO(out, rc);
+               }
        }
 
        /* at the moment there is no linkage between lu_type and obd_type, so
index b90ecc0..0982e93 100644 (file)
@@ -833,6 +833,9 @@ int qsd_op_begin(const struct lu_env *env, struct qsd_instance *qsd,
        if (unlikely(qsd == NULL))
                RETURN(0);
 
+       if (qsd->qsd_dev->dd_rdonly)
+               RETURN(0);
+
        /* We don't enforce quota until the qsd_instance is started */
        read_lock(&qsd->qsd_lock);
        if (!qsd->qsd_started) {
@@ -922,6 +925,9 @@ int qsd_adjust(const struct lu_env *env, struct lquota_entry *lqe)
        qqi = lqe2qqi(lqe);
        qsd = qqi->qqi_qsd;
 
+       if (qsd->qsd_dev->dd_rdonly)
+               RETURN(0);
+
        lqe_write_lock(lqe);
 
        /* fill qb_count & qb_flags */
@@ -1075,6 +1081,9 @@ void qsd_op_end(const struct lu_env *env, struct qsd_instance *qsd,
        if (unlikely(qsd == NULL))
                RETURN_EXIT;
 
+       if (qsd->qsd_dev->dd_rdonly)
+               RETURN_EXIT;
+
        /* We don't enforce quota until the qsd_instance is started */
        read_lock(&qsd->qsd_lock);
        if (!qsd->qsd_started) {
index a6b8b9c..6a60263 100644 (file)
@@ -508,9 +508,11 @@ void qsd_fini(const struct lu_env *env, struct qsd_instance *qsd)
        for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++)
                qsd_qtype_fini(env, qsd, qtype);
 
-       /* deregister connection to the quota master */
-       qsd->qsd_exp_valid = false;
-       lustre_deregister_lwp_item(&qsd->qsd_exp);
+       if (qsd->qsd_exp) {
+               /* deregister connection to the quota master */
+               qsd->qsd_exp_valid = false;
+               lustre_deregister_lwp_item(&qsd->qsd_exp);
+       }
 
        /* release per-filesystem information */
        if (qsd->qsd_fsinfo != NULL) {
@@ -703,6 +705,9 @@ int qsd_prepare(const struct lu_env *env, struct qsd_instance *qsd)
        qsd->qsd_prepared = true;
        write_unlock(&qsd->qsd_lock);
 
+       if (qsd->qsd_dev->dd_rdonly)
+               RETURN(0);
+
        /* start reintegration thread for each type, if required */
        for (qtype = USRQUOTA; qtype < LL_MAXQUOTAS; qtype++) {
                struct qsd_qtype_info   *qqi = qsd->qsd_type_array[qtype];
index f5b8f09..b355eb8 100644 (file)
@@ -631,6 +631,9 @@ int qsd_start_reint_thread(struct qsd_qtype_info *qqi)
        char                    *name;
        ENTRY;
 
+       if (qsd->qsd_dev->dd_rdonly)
+               RETURN(0);
+
        /* don't bother to do reintegration when quota isn't enabled */
        if (!qsd_type_enabled(qsd, qqi->qqi_qtype))
                RETURN(0);
index 1cf8c21..f633fef 100644 (file)
@@ -1109,6 +1109,9 @@ int out_handle(struct tgt_session_info *tsi)
                                                     out_reconstruct, reply,
                                                     reply_index))
                                        GOTO(next, rc = 0);
+
+                               if (dt->dd_rdonly)
+                                       GOTO(next, rc = -EROFS);
                        }
 
                        /* start transaction for modification RPC only */
index ea70037..b869298 100644 (file)
@@ -214,6 +214,9 @@ static int tgt_reply_header_write(const struct lu_env *env,
                tgt->lut_obd->obd_name, REPLY_DATA,
                lrh->lrh_magic, lrh->lrh_header_size, lrh->lrh_reply_size);
 
+       if (tgt->lut_bottom->dd_rdonly)
+               RETURN(0);
+
        buf.lrh_magic = cpu_to_le32(lrh->lrh_magic);
        buf.lrh_header_size = cpu_to_le32(lrh->lrh_header_size);
        buf.lrh_reply_size = cpu_to_le32(lrh->lrh_reply_size);
@@ -444,8 +447,9 @@ void tgt_client_free(struct obd_export *exp)
 }
 EXPORT_SYMBOL(tgt_client_free);
 
-int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
-                        struct lsd_client_data *lcd, loff_t *off, int index)
+static int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
+                               struct lsd_client_data *lcd,
+                               loff_t *off, int index)
 {
        struct tgt_thread_info  *tti = tgt_th_info(env);
        int                      rc;
@@ -471,9 +475,10 @@ int tgt_client_data_read(const struct lu_env *env, struct lu_target *tgt,
        return rc;
 }
 
-int tgt_client_data_write(const struct lu_env *env, struct lu_target *tgt,
-                         struct lsd_client_data *lcd, loff_t *off,
-                         struct thandle *th)
+static int tgt_client_data_write(const struct lu_env *env,
+                                struct lu_target *tgt,
+                                struct lsd_client_data *lcd,
+                                loff_t *off, struct thandle *th)
 {
        struct tgt_thread_info *tti = tgt_th_info(env);
        struct dt_object        *dto;
@@ -488,6 +493,59 @@ int tgt_client_data_write(const struct lu_env *env, struct lu_target *tgt,
        return dt_record_write(env, dto, &tti->tti_buf, off, th);
 }
 
+struct tgt_new_client_callback {
+       struct dt_txn_commit_cb  lncc_cb;
+       struct obd_export       *lncc_exp;
+};
+
+static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
+                             struct dt_txn_commit_cb *cb, int err)
+{
+       struct tgt_new_client_callback *ccb;
+
+       ccb = container_of0(cb, struct tgt_new_client_callback, lncc_cb);
+
+       LASSERT(ccb->lncc_exp->exp_obd);
+
+       CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
+              ccb->lncc_exp->exp_obd->obd_name,
+              ccb->lncc_exp->exp_client_uuid.uuid);
+
+       spin_lock(&ccb->lncc_exp->exp_lock);
+
+       ccb->lncc_exp->exp_need_sync = 0;
+
+       spin_unlock(&ccb->lncc_exp->exp_lock);
+       class_export_cb_put(ccb->lncc_exp);
+
+       OBD_FREE_PTR(ccb);
+}
+
+int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
+{
+       struct tgt_new_client_callback  *ccb;
+       struct dt_txn_commit_cb         *dcb;
+       int                              rc;
+
+       OBD_ALLOC_PTR(ccb);
+       if (ccb == NULL)
+               return -ENOMEM;
+
+       ccb->lncc_exp = class_export_cb_get(exp);
+
+       dcb = &ccb->lncc_cb;
+       dcb->dcb_func = tgt_cb_new_client;
+       INIT_LIST_HEAD(&dcb->dcb_linkage);
+       strlcpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
+
+       rc = dt_trans_cb_add(th, dcb);
+       if (rc) {
+               class_export_cb_put(exp);
+               OBD_FREE_PTR(ccb);
+       }
+       return rc;
+}
+
 /**
  * Update client data in last_rcvd
  */
@@ -508,6 +566,9 @@ static int tgt_client_data_update(const struct lu_env *env,
                RETURN(-EINVAL);
        }
 
+       if (tgt->lut_bottom->dd_rdonly)
+               RETURN(0);
+
        th = dt_trans_create(env, tgt->lut_bottom);
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
@@ -556,7 +617,7 @@ out:
        return rc;
 }
 
-int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
+static int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
 {
        struct tgt_thread_info  *tti = tgt_th_info(env);
        int                      rc;
@@ -574,8 +635,8 @@ int tgt_server_data_read(const struct lu_env *env, struct lu_target *tgt)
         return rc;
 }
 
-int tgt_server_data_write(const struct lu_env *env, struct lu_target *tgt,
-                         struct thandle *th)
+static int tgt_server_data_write(const struct lu_env *env,
+                                struct lu_target *tgt, struct thandle *th)
 {
        struct tgt_thread_info  *tti = tgt_th_info(env);
        struct dt_object        *dto;
@@ -619,6 +680,9 @@ int tgt_server_data_update(const struct lu_env *env, struct lu_target *tgt,
        tgt->lut_lsd.lsd_last_transno = tgt->lut_last_transno;
        spin_unlock(&tgt->lut_translock);
 
+       if (tgt->lut_bottom->dd_rdonly)
+               RETURN(0);
+
        th = dt_trans_create(env, tgt->lut_bottom);
        if (IS_ERR(th))
                RETURN(PTR_ERR(th));
@@ -646,8 +710,8 @@ out:
 }
 EXPORT_SYMBOL(tgt_server_data_update);
 
-int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tgt,
-                          loff_t size)
+static int tgt_truncate_last_rcvd(const struct lu_env *env,
+                                 struct lu_target *tgt, loff_t size)
 {
        struct dt_object *dt = tgt->lut_last_rcvd;
        struct thandle   *th;
@@ -656,6 +720,9 @@ int tgt_truncate_last_rcvd(const struct lu_env *env, struct lu_target *tgt,
 
        ENTRY;
 
+       if (tgt->lut_bottom->dd_rdonly)
+               RETURN(0);
+
        attr.la_size = size;
        attr.la_valid = LA_SIZE;
 
@@ -817,8 +884,8 @@ out:
  * Add commit callback function, it returns a non-zero value to inform
  * caller to use sync transaction if necessary.
  */
-int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
-                          struct obd_export *exp, __u64 transno)
+static int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
+                                 struct obd_export *exp, __u64 transno)
 {
        struct tgt_last_committed_callback      *ccb;
        struct dt_txn_commit_cb                 *dcb;
@@ -852,59 +919,6 @@ int tgt_last_commit_cb_add(struct thandle *th, struct lu_target *tgt,
        return rc ? rc : exp->exp_need_sync;
 }
 
-struct tgt_new_client_callback {
-       struct dt_txn_commit_cb  lncc_cb;
-       struct obd_export       *lncc_exp;
-};
-
-static void tgt_cb_new_client(struct lu_env *env, struct thandle *th,
-                             struct dt_txn_commit_cb *cb, int err)
-{
-       struct tgt_new_client_callback *ccb;
-
-       ccb = container_of0(cb, struct tgt_new_client_callback, lncc_cb);
-
-       LASSERT(ccb->lncc_exp->exp_obd);
-
-       CDEBUG(D_RPCTRACE, "%s: committing for initial connect of %s\n",
-              ccb->lncc_exp->exp_obd->obd_name,
-              ccb->lncc_exp->exp_client_uuid.uuid);
-
-       spin_lock(&ccb->lncc_exp->exp_lock);
-
-       ccb->lncc_exp->exp_need_sync = 0;
-
-       spin_unlock(&ccb->lncc_exp->exp_lock);
-       class_export_cb_put(ccb->lncc_exp);
-
-       OBD_FREE_PTR(ccb);
-}
-
-int tgt_new_client_cb_add(struct thandle *th, struct obd_export *exp)
-{
-       struct tgt_new_client_callback  *ccb;
-       struct dt_txn_commit_cb         *dcb;
-       int                              rc;
-
-       OBD_ALLOC_PTR(ccb);
-       if (ccb == NULL)
-               return -ENOMEM;
-
-       ccb->lncc_exp = class_export_cb_get(exp);
-
-       dcb = &ccb->lncc_cb;
-       dcb->dcb_func = tgt_cb_new_client;
-       INIT_LIST_HEAD(&dcb->dcb_linkage);
-       strlcpy(dcb->dcb_name, "tgt_cb_new_client", sizeof(dcb->dcb_name));
-
-       rc = dt_trans_cb_add(th, dcb);
-       if (rc) {
-               class_export_cb_put(exp);
-               OBD_FREE_PTR(ccb);
-       }
-       return rc;
-}
-
 /**
  * Add new client to the last_rcvd upon new connection.
  *
@@ -1419,6 +1433,9 @@ static int tgt_clients_data_init(const struct lu_env *env,
 
        ENTRY;
 
+       if (tgt->lut_bottom->dd_rdonly)
+               RETURN(0);
+
        CLASSERT(offsetof(struct lsd_client_data, lcd_padding) +
                 sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
 
@@ -1618,12 +1635,23 @@ int tgt_server_data_init(const struct lu_env *env, struct lu_target *tgt)
                        RETURN(rc);
                }
                if (strcmp(lsd->lsd_uuid, tgt->lut_obd->obd_uuid.uuid)) {
-                       LCONSOLE_ERROR_MSG(0x157, "Trying to start OBD %s "
-                                          "using the wrong disk %s. Were the"
-                                          " /dev/ assignments rearranged?\n",
-                                          tgt->lut_obd->obd_uuid.uuid,
-                                          lsd->lsd_uuid);
-                       RETURN(-EINVAL);
+                       if (tgt->lut_bottom->dd_rdonly) {
+                               /* Such difference may be caused by mounting
+                                * up snapshot with new fsname under rd_only
+                                * mode. But even if it was NOT, it will not
+                                * damage the system because of "rd_only". */
+                               memcpy(lsd->lsd_uuid,
+                                      tgt->lut_obd->obd_uuid.uuid,
+                                      sizeof(lsd->lsd_uuid));
+                       } else {
+                               LCONSOLE_ERROR_MSG(0x157, "Trying to start "
+                                                  "OBD %s using the wrong "
+                                                  "disk %s. Were the /dev/ "
+                                                  "assignments rearranged?\n",
+                                                  tgt->lut_obd->obd_uuid.uuid,
+                                                  lsd->lsd_uuid);
+                               RETURN(-EINVAL);
+                       }
                }
 
                if (lsd->lsd_osd_index != index) {
@@ -1740,6 +1768,14 @@ int tgt_txn_start_cb(const struct lu_env *env, struct thandle *th,
        struct dt_object        *dto;
        int                      rc;
 
+       /* For readonly case, the caller should have got failure
+        * when start the transaction. If the logic comes here,
+        * there must be something wrong. */
+       if (unlikely(tgt->lut_bottom->dd_rdonly)) {
+               dump_stack();
+               LBUG();
+       }
+
        /* if there is no session, then this transaction is not result of
         * request processing but some local operation */
        if (env->le_ses == NULL)
index 9fccd3b..2e4892b 100644 (file)
@@ -1701,6 +1701,9 @@ int distribute_txn_init(const struct lu_env *env,
        atomic_set(&tdtd->tdtd_recovery_threads_count, 0);
 
        tdtd->tdtd_lut = lut;
+       if (lut->lut_bottom->dd_rdonly)
+               RETURN(0);
+
        rc = distribute_txn_commit_batchid_init(env, tdtd);
        if (rc != 0)
                RETURN(rc);
index 839469c..91e20a1 100755 (executable)
@@ -16128,6 +16128,59 @@ test_801c() {
 }
 run_test 801c "rescan barrier bitmap"
 
+saved_MGS_MOUNT_OPTS=$MGS_MOUNT_OPTS
+saved_MDS_MOUNT_OPTS=$MDS_MOUNT_OPTS
+saved_OST_MOUNT_OPTS=$OST_MOUNT_OPTS
+
+cleanup_802() {
+       trap 0
+
+       stopall
+       MGS_MOUNT_OPTS=$saved_MGS_MOUNT_OPTS
+       MDS_MOUNT_OPTS=$saved_MDS_MOUNT_OPTS
+       OST_MOUNT_OPTS=$saved_OST_MOUNT_OPTS
+       setupall
+}
+
+test_802() {
+       mkdir $DIR/$tdir || error "(1) fail to mkdir"
+
+       cp $LUSTRE/tests/test-framework.sh $DIR/$tdir/ ||
+               error "(2) Fail to copy"
+
+       trap cleanup_802 EXIT
+
+       # sync by force before remount as readonly
+       sync; sync_all_data; sleep 3; sync_all_data
+
+       stopall
+
+       MGS_MOUNT_OPTS=$(csa_add "$MGS_MOUNT_OPTS" -o rdonly_dev)
+       MDS_MOUNT_OPTS=$(csa_add "$MDS_MOUNT_OPTS" -o rdonly_dev)
+       OST_MOUNT_OPTS=$(csa_add "$OST_MOUNT_OPTS" -o rdonly_dev)
+
+       echo "Mount the server as read only"
+       setupall server_only || error "(3) Fail to start servers"
+
+       echo "Mount client without ro should fail"
+       mount_client $MOUNT &&
+               error "(4) Mount client without 'ro' should fail"
+
+       echo "Mount client with ro should succeed"
+       mount_client $MOUNT ro ||
+               error "(5) Mount client with 'ro' should succeed"
+
+       echo "Modify should be refused"
+       touch $DIR/$tdir/guard && error "(6) Touch should fail under ro mode"
+
+       echo "Read should be allowed"
+       diff $LUSTRE/tests/test-framework.sh $DIR/$tdir/test-framework.sh ||
+               error "(7) Read should succeed under ro mode"
+
+       cleanup_802
+}
+run_test 802 "simulate readonly device"
+
 #
 # tests that do cleanup/setup should be run at the end
 #
index 39a0990..623caca 100755 (executable)
@@ -3684,6 +3684,8 @@ writeconf_all () {
 }
 
 setupall() {
+       local arg1=$1
+
        nfs_client_mode && return
        cifs_client_mode && return
 
@@ -3736,6 +3738,11 @@ setupall() {
     fi
 
     [ "$DAEMONFILE" ] && $LCTL debug_daemon start $DAEMONFILE $DAEMONSIZE
+
+       if [ ! -z $arg1 ]; then
+               [ "$arg1" = "server_only" ] && return
+       fi
+
     mount_client $MOUNT
     [ -n "$CLIENTS" ] && zconf_mount_clients $CLIENTS $MOUNT
     clients_up
index 0f30e13..be29c3f 100644 (file)
@@ -389,8 +389,11 @@ int zfs_read_ldd(char *ds,  struct lustre_disk_data *ldd)
                return EINVAL;
 
        zhp = zfs_open(g_zfs, ds, ZFS_TYPE_FILESYSTEM);
-       if (zhp == NULL)
-               goto out;
+       if (!zhp) {
+               zhp = zfs_open(g_zfs, ds, ZFS_TYPE_SNAPSHOT);
+               if (!zhp)
+                       goto out;
+       }
 
        for (i = 0; special_ldd_prop_params[i].zlpb_prop_name != NULL; i++) {
                bridge = &special_ldd_prop_params[i];