Whamcloud - gitweb
LU-1267 lfsck: rebuild LAST_ID
[fs/lustre-release.git] / lustre / ofd / ofd_dev.c
index afe056e..ea5d166 100644 (file)
@@ -381,6 +381,38 @@ static struct lu_object *ofd_object_alloc(const struct lu_env *env,
 
 extern int ost_handle(struct ptlrpc_request *req);
 
+static int ofd_lfsck_out_notify(const struct lu_env *env, void *data,
+                               enum lfsck_events event)
+{
+       struct ofd_device *ofd = data;
+       struct obd_device *obd = ofd_obd(ofd);
+
+       switch (event) {
+       case LE_LASTID_REBUILDING:
+               CWARN("%s: Found crashed LAST_ID, deny creating new OST-object "
+                     "on the device until the LAST_ID rebuilt successfully.\n",
+                     obd->obd_name);
+               down_write(&ofd->ofd_lastid_rwsem);
+               ofd->ofd_lastid_rebuilding = 1;
+               up_write(&ofd->ofd_lastid_rwsem);
+               break;
+       case LE_LASTID_REBUILT: {
+               down_write(&ofd->ofd_lastid_rwsem);
+               ofd_seqs_free(env, ofd);
+               ofd->ofd_lastid_rebuilding = 0;
+               ofd->ofd_lastid_gen++;
+               up_write(&ofd->ofd_lastid_rwsem);
+               break;
+       }
+       default:
+               CERROR("%s: unknown lfsck event: rc = %d\n",
+                      ofd_obd(ofd)->obd_name, event);
+               return -EINVAL;
+       }
+
+       return 0;
+}
+
 static int ofd_prepare(const struct lu_env *env, struct lu_device *pdev,
                       struct lu_device *dev)
 {
@@ -402,7 +434,8 @@ static int ofd_prepare(const struct lu_env *env, struct lu_device *pdev,
        if (rc != 0)
                RETURN(rc);
 
-       rc = lfsck_register(env, ofd->ofd_osd, &ofd->ofd_dt_dev, false);
+       rc = lfsck_register(env, ofd->ofd_osd, ofd->ofd_osd,
+                           ofd_lfsck_out_notify, ofd, false);
        if (rc != 0) {
                CERROR("%s: failed to initialize lfsck: rc = %d\n",
                       obd->obd_name, rc);
@@ -1181,7 +1214,8 @@ static int ofd_create_hdl(struct tgt_session_info *tsi)
        struct ost_body         *repbody;
        const struct obdo       *oa = &tsi->tsi_ost_body->oa;
        struct obdo             *rep_oa;
-       struct ofd_device       *ofd = ofd_exp(tsi->tsi_exp);
+       struct obd_export       *exp = tsi->tsi_exp;
+       struct ofd_device       *ofd = ofd_exp(exp);
        obd_seq                  seq = ostid_seq(&oa->o_oi);
        obd_id                   oid = ostid_id(&oa->o_oi);
        struct ofd_seq          *oseq;
@@ -1197,6 +1231,13 @@ static int ofd_create_hdl(struct tgt_session_info *tsi)
        if (repbody == NULL)
                RETURN(-ENOMEM);
 
+       down_read(&ofd->ofd_lastid_rwsem);
+       /* Currently, for safe, we do not distinguish which LAST_ID is broken,
+        * we may do that in the future.
+        * Return -ENOSPC until the LAST_ID rebuilt. */
+       if (unlikely(ofd->ofd_lastid_rebuilding))
+               GOTO(out_sem, rc = -ENOSPC);
+
        rep_oa = &repbody->oa;
        rep_oa->o_oi = oa->o_oi;
 
@@ -1209,7 +1250,7 @@ static int ofd_create_hdl(struct tgt_session_info *tsi)
        if (IS_ERR(oseq)) {
                CERROR("%s: Can't find FID Sequence "LPX64": rc = %ld\n",
                       ofd_name(ofd), seq, PTR_ERR(oseq));
-               RETURN(-EINVAL);
+               GOTO(out_sem, rc = -EINVAL);
        }
 
        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
@@ -1228,9 +1269,11 @@ static int ofd_create_hdl(struct tgt_session_info *tsi)
        /* former ofd_handle_precreate */
        if ((oa->o_valid & OBD_MD_FLFLAGS) &&
            (oa->o_flags & OBD_FL_DELORPHAN)) {
+               exp->exp_filter_data.fed_lastid_gen = ofd->ofd_lastid_gen;
+
                /* destroy orphans */
                if (lustre_msg_get_conn_cnt(tgt_ses_req(tsi)->rq_reqmsg) <
-                   tsi->tsi_exp->exp_conn_cnt) {
+                   exp->exp_conn_cnt) {
                        CERROR("%s: dropping old orphan cleanup request\n",
                               ofd_name(ofd));
                        GOTO(out_nolock, rc = 0);
@@ -1251,7 +1294,7 @@ static int ofd_create_hdl(struct tgt_session_info *tsi)
                        /* FIXME: should reset precreate_next_id on MDS */
                        rc = 0;
                } else if (diff < 0) {
-                       rc = ofd_orphans_destroy(tsi->tsi_env, tsi->tsi_exp,
+                       rc = ofd_orphans_destroy(tsi->tsi_env, exp,
                                                 ofd, rep_oa);
                        oseq->os_destroys_in_progress = 0;
                } else {
@@ -1259,9 +1302,15 @@ static int ofd_create_hdl(struct tgt_session_info *tsi)
                        oseq->os_destroys_in_progress = 0;
                }
        } else {
+               if (unlikely(exp->exp_filter_data.fed_lastid_gen !=
+                            ofd->ofd_lastid_gen)) {
+                       ofd_obd_disconnect(exp);
+                       GOTO(out_nolock, rc = -ENOTCONN);
+               }
+
                mutex_lock(&oseq->os_create_lock);
                if (lustre_msg_get_conn_cnt(tgt_ses_req(tsi)->rq_reqmsg) <
-                   tsi->tsi_exp->exp_conn_cnt) {
+                   exp->exp_conn_cnt) {
                        CERROR("%s: dropping old precreate request\n",
                               ofd_name(ofd));
                        GOTO(out, rc = 0);
@@ -1366,7 +1415,7 @@ static int ofd_create_hdl(struct tgt_session_info *tsi)
                ostid_set_id(&rep_oa->o_oi, ofd_seq_last_oid(oseq));
        }
        EXIT;
-       ofd_counter_incr(tsi->tsi_exp, LPROC_OFD_STATS_CREATE,
+       ofd_counter_incr(exp, LPROC_OFD_STATS_CREATE,
                         tsi->tsi_jobid, 1);
 out:
        mutex_unlock(&oseq->os_create_lock);
@@ -1375,6 +1424,9 @@ out_nolock:
                rep_oa->o_valid |= OBD_MD_FLID | OBD_MD_FLGROUP;
 
        ofd_seq_put(tsi->tsi_env, oseq);
+
+out_sem:
+       up_read(&ofd->ofd_lastid_rwsem);
        return rc;
 }
 
@@ -2070,6 +2122,7 @@ static int ofd_init0(const struct lu_env *env, struct ofd_device *m,
        spin_lock_init(&m->ofd_batch_lock);
        rwlock_init(&obd->u.filter.fo_sptlrpc_lock);
        sptlrpc_rule_set_init(&obd->u.filter.fo_sptlrpc_rset);
+       init_rwsem(&m->ofd_lastid_rwsem);
 
        obd->u.filter.fo_fl_oss_capa = 0;
        CFS_INIT_LIST_HEAD(&obd->u.filter.fo_capa_keys);