Whamcloud - gitweb
LU-2844 mdd: mdt prepare failure should not oops
[fs/lustre-release.git] / lustre / obdclass / llog_osd.c
index 4ebe3a8..70aa4c4 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Intel, Inc.
+ * Copyright (c) 2012, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
@@ -109,11 +109,12 @@ static int llog_osd_pad(const struct lu_env *env, struct dt_object *o,
 
        lgi->lgi_buf.lb_buf = &lgi->lgi_lrh;
        lgi->lgi_buf.lb_len = sizeof(lgi->lgi_lrh);
+       dt_write_lock(env, o, 0);
        rc = dt_record_write(env, o, &lgi->lgi_buf, off, th);
        if (rc) {
                CERROR("%s: error writing padding record: rc = %d\n",
                       o->do_lu.lo_dev->ld_obd->obd_name, rc);
-               RETURN(rc);
+               GOTO(out, rc);
        }
 
        lgi->lgi_buf.lb_buf = &lgi->lgi_tail;
@@ -123,7 +124,8 @@ static int llog_osd_pad(const struct lu_env *env, struct dt_object *o,
        if (rc)
                CERROR("%s: error writing padding record: rc = %d\n",
                       o->do_lu.lo_dev->ld_obd->obd_name, rc);
-
+out:
+       dt_write_unlock(env, o);
        RETURN(rc);
 }
 
@@ -157,6 +159,8 @@ static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o,
        }
 
        /* the buf case */
+       /* protect the following 3 writes from concurrent read */
+       dt_write_lock(env, o, 0);
        rec->lrh_len = sizeof(*rec) + buflen + sizeof(lgi->lgi_tail);
        lgi->lgi_buf.lb_len = sizeof(*rec);
        lgi->lgi_buf.lb_buf = rec;
@@ -185,6 +189,7 @@ static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o,
                CERROR("%s: error writing log tail: rc = %d\n",
                       o->do_lu.lo_dev->ld_obd->obd_name, rc);
 out:
+       dt_write_unlock(env, o);
        RETURN(rc);
 }
 
@@ -429,15 +434,15 @@ static int llog_osd_write_rec(const struct lu_env *env,
        /* The caller should make sure only 1 process access the lgh_last_idx,
         * Otherwise it might hit the assert.*/
        LASSERT(index < LLOG_BITMAP_SIZE(llh));
-       cfs_spin_lock(&loghandle->lgh_hdr_lock);
+       spin_lock(&loghandle->lgh_hdr_lock);
        if (ext2_set_bit(index, llh->llh_bitmap)) {
                CERROR("%s: index %u already set in log bitmap\n",
                       o->do_lu.lo_dev->ld_obd->obd_name, index);
-               cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+               spin_unlock(&loghandle->lgh_hdr_lock);
                LBUG(); /* should never happen */
        }
        llh->llh_count++;
-       cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+       spin_unlock(&loghandle->lgh_hdr_lock);
        llh->llh_tail.lrt_index = index;
 
        lgi->lgi_off = 0;
@@ -536,7 +541,19 @@ static int llog_osd_next_block(const struct lu_env *env,
                lgi->lgi_buf.lb_len = LLOG_CHUNK_SIZE -
                                      (*cur_offset & (LLOG_CHUNK_SIZE - 1));
                lgi->lgi_buf.lb_buf = buf;
+
+               /* Note: read lock is not needed around la_size get above at
+                * the time of dt_attr_get(). There are only two cases that
+                * matter. Either la_size == cur_offset, in which case the
+                * entire read is skipped, or la_size > cur_offset and the loop
+                * is entered and this thread is blocked at dt_read_lock()
+                * until the write is completed. When the write completes, then
+                * the dt_read() will be done with the full length, and will
+                * get the full data.
+                */
+               dt_read_lock(env, o, 0);
                rc = dt_read(env, o, &lgi->lgi_buf, cur_offset);
+               dt_read_unlock(env, o);
                if (rc < 0) {
                        CERROR("%s: can't read llog block from log "DFID
                               " offset "LPU64": rc = %d\n",
@@ -646,7 +663,12 @@ static int llog_osd_prev_block(const struct lu_env *env,
 
                lgi->lgi_buf.lb_len = len;
                lgi->lgi_buf.lb_buf = buf;
+               /* It is OK to have locking around dt_read() only, see
+                * comment in llog_osd_next_block for details
+                */
+               dt_read_lock(env, o, 0);
                rc = dt_read(env, o, &lgi->lgi_buf, &cur_offset);
+               dt_read_unlock(env, o);
                if (rc < 0) {
                        CERROR("%s: can't read llog block from log "DFID
                               " offset "LPU64": rc = %d\n",
@@ -708,6 +730,28 @@ out:
        return rc;
 }
 
+struct dt_object *llog_osd_dir_get(const struct lu_env *env,
+                                  struct llog_ctxt *ctxt)
+{
+       struct dt_device        *dt;
+       struct dt_thread_info   *dti = dt_info(env);
+       struct dt_object        *dir;
+       int                      rc;
+
+       dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt;
+       if (ctxt->loc_dir == NULL) {
+               rc = dt_root_get(env, dt, &dti->dti_fid);
+               if (rc)
+                       return ERR_PTR(rc);
+               dir = dt_locate(env, dt, &dti->dti_fid);
+       } else {
+               lu_object_get(&ctxt->loc_dir->do_lu);
+               dir = ctxt->loc_dir;
+       }
+
+       return dir;
+}
+
 static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
                         struct llog_logid *logid, char *name,
                         enum llog_open_param open_param)
@@ -733,9 +777,9 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
        if (IS_ERR(ls))
                RETURN(PTR_ERR(ls));
 
-       cfs_mutex_lock(&ls->ls_los_mutex);
+       mutex_lock(&ls->ls_los_mutex);
        los = dt_los_find(ls, FID_SEQ_LLOG);
-       cfs_mutex_unlock(&ls->ls_los_mutex);
+       mutex_unlock(&ls->ls_los_mutex);
        LASSERT(los);
        ls_device_put(env, ls);
 
@@ -744,10 +788,15 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
        if (logid != NULL) {
                logid_to_fid(logid, &lgi->lgi_fid);
        } else if (name) {
-               LASSERT(ctxt->loc_dir);
-               dt_read_lock(env, ctxt->loc_dir, 0);
-               rc = dt_lookup_dir(env, ctxt->loc_dir, name, &lgi->lgi_fid);
-               dt_read_unlock(env, ctxt->loc_dir);
+               struct dt_object *llog_dir;
+
+               llog_dir = llog_osd_dir_get(env, ctxt);
+               if (IS_ERR(llog_dir))
+                       GOTO(out, rc = PTR_ERR(llog_dir));
+               dt_read_lock(env, llog_dir, 0);
+               rc = dt_lookup_dir(env, llog_dir, name, &lgi->lgi_fid);
+               dt_read_unlock(env, llog_dir);
+               lu_object_put(env, &llog_dir->do_lu);
                if (rc == -ENOENT && open_param == LLOG_OPEN_NEW) {
                        /* generate fid for new llog */
                        rc = local_object_fid_generate(env, los,
@@ -830,12 +879,17 @@ static int llog_osd_declare_create(const struct lu_env *env,
                RETURN(rc);
 
        if (res->lgh_name) {
-               LASSERT(res->lgh_ctxt->loc_dir);
+               struct dt_object *llog_dir;
+
+               llog_dir = llog_osd_dir_get(env, res->lgh_ctxt);
+               if (IS_ERR(llog_dir))
+                       RETURN(PTR_ERR(llog_dir));
                dt_declare_ref_add(env, o, th);
                logid_to_fid(&res->lgh_id, &lgi->lgi_fid);
-               rc = dt_declare_insert(env, res->lgh_ctxt->loc_dir,
+               rc = dt_declare_insert(env, llog_dir,
                                       (struct dt_rec *)&lgi->lgi_fid,
                                       (struct dt_key *)res->lgh_name, th);
+               lu_object_put(env, &llog_dir->do_lu);
                if (rc)
                        CERROR("%s: can't declare named llog %s: rc = %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
@@ -879,14 +933,20 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res,
                RETURN(rc);
 
        if (res->lgh_name) {
-               LASSERT(res->lgh_ctxt->loc_dir);
+               struct dt_object *llog_dir;
+
+               llog_dir = llog_osd_dir_get(env, res->lgh_ctxt);
+               if (IS_ERR(llog_dir))
+                       RETURN(PTR_ERR(llog_dir));
+
                logid_to_fid(&res->lgh_id, &lgi->lgi_fid);
-               dt_read_lock(env, res->lgh_ctxt->loc_dir, 0);
-               rc = dt_insert(env, res->lgh_ctxt->loc_dir,
+               dt_read_lock(env, llog_dir, 0);
+               rc = dt_insert(env, llog_dir,
                               (struct dt_rec *)&lgi->lgi_fid,
                               (struct dt_key *)res->lgh_name,
                               th, BYPASS_CAPA, 1);
-               dt_read_unlock(env, res->lgh_ctxt->loc_dir);
+               dt_read_unlock(env, llog_dir);
+               lu_object_put(env, &llog_dir->do_lu);
                if (rc)
                        CERROR("%s: can't create named llog %s: rc = %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
@@ -919,8 +979,9 @@ static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle)
 static int llog_osd_destroy(const struct lu_env *env,
                            struct llog_handle *loghandle)
 {
+       struct llog_thread_info *lgi = llog_info(env);
        struct llog_ctxt        *ctxt;
-       struct dt_object        *o;
+       struct dt_object        *o, *llog_dir = NULL;
        struct dt_device        *d;
        struct thandle          *th;
        char                    *name = NULL;
@@ -943,10 +1004,13 @@ static int llog_osd_destroy(const struct lu_env *env,
                RETURN(PTR_ERR(th));
 
        if (loghandle->lgh_name) {
-               LASSERT(ctxt->loc_dir);
+               llog_dir = llog_osd_dir_get(env, ctxt);
+               if (IS_ERR(llog_dir))
+                       GOTO(out_trans, rc = PTR_ERR(llog_dir));
+
                dt_declare_ref_del(env, o, th);
                name = loghandle->lgh_name;
-               rc = dt_declare_delete(env, ctxt->loc_dir,
+               rc = dt_declare_delete(env, llog_dir,
                                       (struct dt_key *)name, th);
                if (rc)
                        GOTO(out_trans, rc);
@@ -966,11 +1030,11 @@ static int llog_osd_destroy(const struct lu_env *env,
        if (dt_object_exists(o)) {
                if (name) {
                        dt_ref_del(env, o, th);
-                       dt_read_lock(env, ctxt->loc_dir, 0);
-                       rc = dt_delete(env, ctxt->loc_dir,
+                       dt_read_lock(env, llog_dir, 0);
+                       rc = dt_delete(env, llog_dir,
                                       (struct dt_key *) name,
                                       th, BYPASS_CAPA);
-                       dt_read_unlock(env, ctxt->loc_dir);
+                       dt_read_unlock(env, llog_dir);
                        if (rc) {
                                CERROR("%s: can't remove llog %s: rc = %d\n",
                                       o->do_lu.lo_dev->ld_obd->obd_name,
@@ -978,7 +1042,18 @@ static int llog_osd_destroy(const struct lu_env *env,
                                GOTO(out_unlock, rc);
                        }
                }
-               dt_ref_del(env, o, th);
+               /*
+                * XXX: compatibility bits
+                *      on old filesystems llogs are referenced by the name
+                *      on the new ones they are referenced by OI and by
+                *      the name
+                */
+               rc = dt_attr_get(env, o, &lgi->lgi_attr, NULL);
+               if (rc)
+                       GOTO(out_unlock, rc);
+               LASSERT(lgi->lgi_attr.la_nlink < 2);
+               if (lgi->lgi_attr.la_nlink == 1)
+                       dt_ref_del(env, o, th);
                rc = dt_destroy(env, o, th);
                if (rc)
                        GOTO(out_unlock, rc);
@@ -987,6 +1062,8 @@ out_unlock:
        dt_write_unlock(env, o);
 out_trans:
        dt_trans_stop(env, d, th);
+       if (llog_dir != NULL)
+               lu_object_put(env, &llog_dir->do_lu);
        RETURN(rc);
 }
 
@@ -1030,9 +1107,9 @@ static int llog_osd_cleanup(const struct lu_env *env, struct llog_ctxt *ctxt)
        if (IS_ERR(ls))
                RETURN(PTR_ERR(ls));
 
-       cfs_mutex_lock(&ls->ls_los_mutex);
+       mutex_lock(&ls->ls_los_mutex);
        los = dt_los_find(ls, FID_SEQ_LLOG);
-       cfs_mutex_unlock(&ls->ls_los_mutex);
+       mutex_unlock(&ls->ls_los_mutex);
        if (los != NULL) {
                dt_los_put(los);
                local_oid_storage_fini(env, los);
@@ -1075,6 +1152,7 @@ int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d,
        lgi->lgi_off = idx *  sizeof(*idarray);
 
        lu_local_obj_fid(&lgi->lgi_fid, LLOG_CATALOGS_OID);
+
        o = dt_locate(env, d, &lgi->lgi_fid);
        if (IS_ERR(o))
                RETURN(PTR_ERR(o));
@@ -1169,6 +1247,7 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
        lgi->lgi_off = idx * sizeof(*idarray);
 
        lu_local_obj_fid(&lgi->lgi_fid, LLOG_CATALOGS_OID);
+
        o = dt_locate(env, d, &lgi->lgi_fid);
        if (IS_ERR(o))
                RETURN(PTR_ERR(o));