Whamcloud - gitweb
LU-3319 lprocfs: update nodemap lproc to newer kernel
[fs/lustre-release.git] / lustre / obdclass / llog_osd.c
index 80904c5..fd9bfc3 100644 (file)
@@ -27,7 +27,7 @@
  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
  * Use is subject to license terms.
  *
- * Copyright (c) 2011, 2012, Intel, Inc.
+ * Copyright (c) 2012, 2013, Intel Corporation.
  */
 /*
  * This file is part of Lustre, http://www.lustre.org/
 
 #define DEBUG_SUBSYSTEM S_LOG
 
-#ifndef EXPORT_SYMTAB
-#define EXPORT_SYMTAB
-#endif
-
 #include <obd.h>
 #include <obd_class.h>
 #include <lustre_fid.h>
@@ -148,6 +144,9 @@ static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o,
        CDEBUG(D_OTHER, "write blob with type %x, buf %p/%u at off %llu\n",
               rec->lrh_type, buf, buflen, *off);
 
+       lgi->lgi_attr.la_valid = LA_SIZE;
+       lgi->lgi_attr.la_size = *off;
+
        if (!buf) {
                lgi->lgi_buf.lb_len = buflen;
                lgi->lgi_buf.lb_buf = rec;
@@ -155,7 +154,7 @@ static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o,
                if (rc)
                        CERROR("%s: error writing log record: rc = %d\n",
                               o->do_lu.lo_dev->ld_obd->obd_name, rc);
-               RETURN(rc);
+               GOTO(out, rc);
        }
 
        /* the buf case */
@@ -168,7 +167,7 @@ static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o,
        if (rc) {
                CERROR("%s: error writing log hdr: rc = %d\n",
                       o->do_lu.lo_dev->ld_obd->obd_name, rc);
-               GOTO(out, rc);
+               GOTO(out_unlock, rc);
        }
 
        lgi->lgi_buf.lb_len = buflen;
@@ -177,7 +176,7 @@ static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o,
        if (rc) {
                CERROR("%s: error writing log buffer: rc = %d\n",
                       o->do_lu.lo_dev->ld_obd->obd_name,  rc);
-               GOTO(out, rc);
+               GOTO(out_unlock, rc);
        }
 
        lgi->lgi_tail.lrt_len = rec->lrh_len;
@@ -188,8 +187,18 @@ static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o,
        if (rc)
                CERROR("%s: error writing log tail: rc = %d\n",
                       o->do_lu.lo_dev->ld_obd->obd_name, rc);
-out:
+
+out_unlock:
        dt_write_unlock(env, o);
+
+out:
+       /* cleanup the content written above */
+       if (rc) {
+               dt_punch(env, o, lgi->lgi_attr.la_size, OBD_OBJECT_EOF, th,
+                        BYPASS_CAPA);
+               dt_attr_set(env, o, &lgi->lgi_attr, th, BYPASS_CAPA);
+       }
+
        RETURN(rc);
 }
 
@@ -274,12 +283,16 @@ static int llog_osd_declare_write_rec(const struct lu_env *env,
        LASSERT(env);
        LASSERT(th);
        LASSERT(loghandle);
+       LASSERT(rec);
+       LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE);
 
        o = loghandle->lgh_obj;
        LASSERT(o);
 
+       lgi->lgi_buf.lb_len = sizeof(struct llog_log_hdr);
+       lgi->lgi_buf.lb_buf = NULL;
        /* each time we update header */
-       rc = dt_declare_record_write(env, o, sizeof(struct llog_log_hdr), 0,
+       rc = dt_declare_record_write(env, o, &lgi->lgi_buf, 0,
                                     th);
        if (rc || idx == 0) /* if error or just header */
                RETURN(rc);
@@ -290,12 +303,18 @@ static int llog_osd_declare_write_rec(const struct lu_env *env,
                LASSERT(ergo(rc == 0, lgi->lgi_attr.la_valid & LA_SIZE));
                if (rc)
                        RETURN(rc);
+
+               rc = dt_declare_punch(env, o, lgi->lgi_off, OBD_OBJECT_EOF, th);
+               if (rc)
+                       RETURN(rc);
        } else {
                lgi->lgi_off = 0;
        }
 
+       lgi->lgi_buf.lb_len = rec->lrh_len;
+       lgi->lgi_buf.lb_buf = NULL;
        /* XXX: implement declared window or multi-chunks approach */
-       rc = dt_declare_record_write(env, o, 32 * 1024, lgi->lgi_off, th);
+       rc = dt_declare_record_write(env, o, &lgi->lgi_buf, -1, th);
 
        RETURN(rc);
 }
@@ -311,7 +330,7 @@ static int llog_osd_write_rec(const struct lu_env *env,
        struct llog_thread_info *lgi = llog_info(env);
        struct llog_log_hdr     *llh;
        int                      reclen = rec->lrh_len;
-       int                      index, rc;
+       int                      index, rc, old_tail_idx;
        struct llog_rec_tail    *lrt;
        struct dt_object        *o;
        size_t                   left;
@@ -373,9 +392,10 @@ static int llog_osd_write_rec(const struct lu_env *env,
                        /* We assume that caller has set lgh_cur_* */
                        lgi->lgi_off = loghandle->lgh_cur_offset;
                        CDEBUG(D_OTHER,
-                              "modify record "LPX64": idx:%d/%u/%d, len:%u "
+                              "modify record "DOSTID": idx:%d/%u/%d, len:%u "
                               "offset %llu\n",
-                              loghandle->lgh_id.lgl_oid, idx, rec->lrh_index,
+                              POSTID(&loghandle->lgh_id.lgl_oi), idx,
+                              rec->lrh_index,
                               loghandle->lgh_cur_idx, rec->lrh_len,
                               (long long)(lgi->lgi_off - sizeof(*llh)));
                        if (rec->lrh_index != loghandle->lgh_cur_idx) {
@@ -434,35 +454,51 @@ static int llog_osd_write_rec(const struct lu_env *env,
        /* The caller should make sure only 1 process access the lgh_last_idx,
         * Otherwise it might hit the assert.*/
        LASSERT(index < LLOG_BITMAP_SIZE(llh));
-       cfs_spin_lock(&loghandle->lgh_hdr_lock);
+       spin_lock(&loghandle->lgh_hdr_lock);
        if (ext2_set_bit(index, llh->llh_bitmap)) {
                CERROR("%s: index %u already set in log bitmap\n",
                       o->do_lu.lo_dev->ld_obd->obd_name, index);
-               cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+               spin_unlock(&loghandle->lgh_hdr_lock);
                LBUG(); /* should never happen */
        }
        llh->llh_count++;
-       cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+       spin_unlock(&loghandle->lgh_hdr_lock);
+       old_tail_idx = llh->llh_tail.lrt_index;
        llh->llh_tail.lrt_index = index;
 
        lgi->lgi_off = 0;
        rc = llog_osd_write_blob(env, o, &llh->llh_hdr, NULL, &lgi->lgi_off,
                                 th);
        if (rc)
-               RETURN(rc);
+               GOTO(out, rc);
 
        rc = dt_attr_get(env, o, &lgi->lgi_attr, NULL);
        if (rc)
-               RETURN(rc);
+               GOTO(out, rc);
+
        LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
        lgi->lgi_off = lgi->lgi_attr.la_size;
 
        rc = llog_osd_write_blob(env, o, rec, buf, &lgi->lgi_off, th);
-       if (rc)
-               RETURN(rc);
 
-       CDEBUG(D_RPCTRACE, "added record "LPX64": idx: %u, %u\n",
-              loghandle->lgh_id.lgl_oid, index, rec->lrh_len);
+out:
+       /* cleanup llog for error case */
+       if (rc) {
+               spin_lock(&loghandle->lgh_hdr_lock);
+               ext2_clear_bit(index, llh->llh_bitmap);
+               llh->llh_count--;
+               spin_unlock(&loghandle->lgh_hdr_lock);
+
+               /* restore the header */
+               loghandle->lgh_last_idx--;
+               llh->llh_tail.lrt_index = old_tail_idx;
+               lgi->lgi_off = 0;
+               llog_osd_write_blob(env, o, &llh->llh_hdr, NULL,
+                                   &lgi->lgi_off, th);
+       }
+
+       CDEBUG(D_RPCTRACE, "added record "DOSTID": idx: %u, %u\n",
+              POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len);
        if (rc == 0 && reccookie) {
                reccookie->lgc_lgl = loghandle->lgh_id;
                reccookie->lgc_index = index;
@@ -573,10 +609,10 @@ static int llog_osd_next_block(const struct lu_env *env,
                        GOTO(out, rc);
 
                if (rc < sizeof(*tail)) {
-                       CERROR("%s: invalid llog block at log id "LPU64"/%u "
+                       CERROR("%s: invalid llog block at log id "DOSTID"/%u "
                               "offset "LPU64"\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
-                              loghandle->lgh_id.lgl_oid,
+                              POSTID(&loghandle->lgh_id.lgl_oi),
                               loghandle->lgh_id.lgl_ogen, *cur_offset);
                        GOTO(out, rc = -EINVAL);
                }
@@ -599,10 +635,10 @@ static int llog_osd_next_block(const struct lu_env *env,
 
                /* this shouldn't happen */
                if (tail->lrt_index == 0) {
-                       CERROR("%s: invalid llog tail at log id "LPU64"/%u "
+                       CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
                               "offset "LPU64"\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
-                              loghandle->lgh_id.lgl_oid,
+                              POSTID(&loghandle->lgh_id.lgl_oi),
                               loghandle->lgh_id.lgl_ogen, *cur_offset);
                        GOTO(out, rc = -EINVAL);
                }
@@ -681,10 +717,10 @@ static int llog_osd_prev_block(const struct lu_env *env,
                        GOTO(out, rc);
 
                if (rc < sizeof(*tail)) {
-                       CERROR("%s: invalid llog block at log id "LPU64"/%u "
+                       CERROR("%s: invalid llog block at log id "DOSTID"/%u "
                               "offset "LPU64"\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
-                              loghandle->lgh_id.lgl_oid,
+                              POSTID(&loghandle->lgh_id.lgl_oi),
                               loghandle->lgh_id.lgl_ogen, cur_offset);
                        GOTO(out, rc = -EINVAL);
                }
@@ -705,10 +741,10 @@ static int llog_osd_prev_block(const struct lu_env *env,
 
                /* this shouldn't happen */
                if (tail->lrt_index == 0) {
-                       CERROR("%s: invalid llog tail at log id "LPU64"/%u "
+                       CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
                               "offset "LPU64"\n",
                               o->do_lu.lo_dev->ld_obd->obd_name,
-                              loghandle->lgh_id.lgl_oid,
+                              POSTID(&loghandle->lgh_id.lgl_oi),
                               loghandle->lgh_id.lgl_ogen, cur_offset);
                        GOTO(out, rc = -EINVAL);
                }
@@ -744,6 +780,11 @@ struct dt_object *llog_osd_dir_get(const struct lu_env *env,
                if (rc)
                        return ERR_PTR(rc);
                dir = dt_locate(env, dt, &dti->dti_fid);
+
+               if (!IS_ERR(dir) && !dt_try_as_dir(env, dir)) {
+                       lu_object_put(env, &dir->do_lu);
+                       return ERR_PTR(-ENOTDIR);
+               }
        } else {
                lu_object_get(&ctxt->loc_dir->do_lu);
                dir = ctxt->loc_dir;
@@ -777,9 +818,9 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
        if (IS_ERR(ls))
                RETURN(PTR_ERR(ls));
 
-       cfs_mutex_lock(&ls->ls_los_mutex);
-       los = dt_los_find(ls, FID_SEQ_LLOG);
-       cfs_mutex_unlock(&ls->ls_los_mutex);
+       mutex_lock(&ls->ls_los_mutex);
+       los = dt_los_find(ls, name != NULL ? FID_SEQ_LLOG_NAME : FID_SEQ_LLOG);
+       mutex_unlock(&ls->ls_los_mutex);
        LASSERT(los);
        ls_device_put(env, ls);
 
@@ -817,7 +858,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
                        GOTO(out, rc);
        }
 
-       o = ls_locate(env, ls, &lgi->lgi_fid);
+       o = ls_locate(env, ls, &lgi->lgi_fid, NULL);
        if (IS_ERR(o))
                GOTO(out_name, rc = PTR_ERR(o));
 
@@ -874,9 +915,8 @@ static int llog_osd_declare_create(const struct lu_env *env,
        if (rc)
                RETURN(rc);
 
-       rc = dt_declare_record_write(env, o, LLOG_CHUNK_SIZE, 0, th);
-       if (rc)
-               RETURN(rc);
+       /* do not declare header initialization here as it's declared
+        * in llog_osd_declare_write_rec() which is always called */
 
        if (res->lgh_name) {
                struct dt_object *llog_dir;
@@ -884,7 +924,6 @@ static int llog_osd_declare_create(const struct lu_env *env,
                llog_dir = llog_osd_dir_get(env, res->lgh_ctxt);
                if (IS_ERR(llog_dir))
                        RETURN(PTR_ERR(llog_dir));
-               dt_declare_ref_add(env, o, th);
                logid_to_fid(&res->lgh_id, &lgi->lgi_fid);
                rc = dt_declare_insert(env, llog_dir,
                                       (struct dt_rec *)&lgi->lgi_fid,
@@ -926,8 +965,7 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res,
                rc = llog_osd_create_new_object(env, los, o, th);
        else
                rc = -EEXIST;
-       if (res->lgh_name)
-               dt_ref_add(env, o, th);
+
        dt_write_unlock(env, o);
        if (rc)
                RETURN(rc);
@@ -979,7 +1017,6 @@ static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle)
 static int llog_osd_destroy(const struct lu_env *env,
                            struct llog_handle *loghandle)
 {
-       struct llog_thread_info *lgi = llog_info(env);
        struct llog_ctxt        *ctxt;
        struct dt_object        *o, *llog_dir = NULL;
        struct dt_device        *d;
@@ -1008,7 +1045,6 @@ static int llog_osd_destroy(const struct lu_env *env,
                if (IS_ERR(llog_dir))
                        GOTO(out_trans, rc = PTR_ERR(llog_dir));
 
-               dt_declare_ref_del(env, o, th);
                name = loghandle->lgh_name;
                rc = dt_declare_delete(env, llog_dir,
                                       (struct dt_key *)name, th);
@@ -1029,7 +1065,6 @@ static int llog_osd_destroy(const struct lu_env *env,
        dt_write_lock(env, o, 0);
        if (dt_object_exists(o)) {
                if (name) {
-                       dt_ref_del(env, o, th);
                        dt_read_lock(env, llog_dir, 0);
                        rc = dt_delete(env, llog_dir,
                                       (struct dt_key *) name,
@@ -1042,18 +1077,7 @@ static int llog_osd_destroy(const struct lu_env *env,
                                GOTO(out_unlock, rc);
                        }
                }
-               /*
-                * XXX: compatibility bits
-                *      on old filesystems llogs are referenced by the name
-                *      on the new ones they are referenced by OI and by
-                *      the name
-                */
-               rc = dt_attr_get(env, o, &lgi->lgi_attr, NULL);
-               if (rc)
-                       GOTO(out_unlock, rc);
-               LASSERT(lgi->lgi_attr.la_nlink < 2);
-               if (lgi->lgi_attr.la_nlink == 1)
-                       dt_ref_del(env, o, th);
+               dt_ref_del(env, o, th);
                rc = dt_destroy(env, o, th);
                if (rc)
                        GOTO(out_unlock, rc);
@@ -1071,11 +1095,9 @@ static int llog_osd_setup(const struct lu_env *env, struct obd_device *obd,
                          struct obd_llog_group *olg, int ctxt_idx,
                          struct obd_device *disk_obd)
 {
-       struct local_oid_storage        *los;
        struct llog_thread_info         *lgi = llog_info(env);
        struct llog_ctxt                *ctxt;
        int                              rc = 0;
-
        ENTRY;
 
        LASSERT(obd);
@@ -1090,31 +1112,41 @@ static int llog_osd_setup(const struct lu_env *env, struct obd_device *obd,
        lgi->lgi_fid.f_oid = 1;
        lgi->lgi_fid.f_ver = 0;
        rc = local_oid_storage_init(env, disk_obd->obd_lvfs_ctxt.dt,
-                                   &lgi->lgi_fid, &los);
+                                   &lgi->lgi_fid,
+                                   &ctxt->loc_los_nameless);
+       if (rc != 0)
+               GOTO(out, rc);
+
+       lgi->lgi_fid.f_seq = FID_SEQ_LLOG_NAME;
+       lgi->lgi_fid.f_oid = 1;
+       lgi->lgi_fid.f_ver = 0;
+       rc = local_oid_storage_init(env, disk_obd->obd_lvfs_ctxt.dt,
+                                   &lgi->lgi_fid,
+                                   &ctxt->loc_los_named);
+       if (rc != 0) {
+               local_oid_storage_fini(env, ctxt->loc_los_nameless);
+               ctxt->loc_los_nameless = NULL;
+       }
+
+       GOTO(out, rc);
+
+out:
        llog_ctxt_put(ctxt);
        return rc;
 }
 
 static int llog_osd_cleanup(const struct lu_env *env, struct llog_ctxt *ctxt)
 {
-       struct dt_device                *dt;
-       struct ls_device                *ls;
-       struct local_oid_storage        *los;
-
-       LASSERT(ctxt->loc_exp->exp_obd);
-       dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt;
-       ls = ls_device_get(dt);
-       if (IS_ERR(ls))
-               RETURN(PTR_ERR(ls));
+       if (ctxt->loc_los_nameless != NULL) {
+               local_oid_storage_fini(env, ctxt->loc_los_nameless);
+               ctxt->loc_los_nameless = NULL;
+       }
 
-       cfs_mutex_lock(&ls->ls_los_mutex);
-       los = dt_los_find(ls, FID_SEQ_LLOG);
-       cfs_mutex_unlock(&ls->ls_los_mutex);
-       if (los != NULL) {
-               dt_los_put(los);
-               local_oid_storage_fini(env, los);
+       if (ctxt->loc_los_named != NULL) {
+               local_oid_storage_fini(env, ctxt->loc_los_named);
+               ctxt->loc_los_named = NULL;
        }
-       ls_device_put(env, ls);
+
        return 0;
 }
 
@@ -1137,7 +1169,8 @@ EXPORT_SYMBOL(llog_osd_ops);
 
 /* reads the catalog list */
 int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d,
-                         int idx, int count, struct llog_catid *idarray)
+                         int idx, int count, struct llog_catid *idarray,
+                         const struct lu_fid *fid)
 {
        struct llog_thread_info *lgi = llog_info(env);
        struct dt_object        *o = NULL;
@@ -1151,7 +1184,7 @@ int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d,
        size = sizeof(*idarray) * count;
        lgi->lgi_off = idx *  sizeof(*idarray);
 
-       lu_local_obj_fid(&lgi->lgi_fid, LLOG_CATALOGS_OID);
+       lgi->lgi_fid = *fid;
        o = dt_locate(env, d, &lgi->lgi_fid);
        if (IS_ERR(o))
                RETURN(PTR_ERR(o));
@@ -1230,7 +1263,8 @@ EXPORT_SYMBOL(llog_osd_get_cat_list);
 
 /* writes the cat list */
 int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
-                         int idx, int count, struct llog_catid *idarray)
+                         int idx, int count, struct llog_catid *idarray,
+                         const struct lu_fid *fid)
 {
        struct llog_thread_info *lgi = llog_info(env);
        struct dt_object        *o = NULL;
@@ -1244,8 +1278,8 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
 
        size = sizeof(*idarray) * count;
        lgi->lgi_off = idx * sizeof(*idarray);
+       lgi->lgi_fid = *fid;
 
-       lu_local_obj_fid(&lgi->lgi_fid, LLOG_CATALOGS_OID);
        o = dt_locate(env, d, &lgi->lgi_fid);
        if (IS_ERR(o))
                RETURN(PTR_ERR(o));
@@ -1268,7 +1302,9 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
        if (IS_ERR(th))
                GOTO(out, rc = PTR_ERR(th));
 
-       rc = dt_declare_record_write(env, o, size, lgi->lgi_off, th);
+       lgi->lgi_buf.lb_len = size;
+       lgi->lgi_buf.lb_buf = idarray;
+       rc = dt_declare_record_write(env, o, &lgi->lgi_buf, lgi->lgi_off, th);
        if (rc)
                GOTO(out, rc);
 
@@ -1276,8 +1312,6 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
        if (rc)
                GOTO(out_trans, rc);
 
-       lgi->lgi_buf.lb_buf = idarray;
-       lgi->lgi_buf.lb_len = size;
        rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
        if (rc)
                CDEBUG(D_INODE, "error writeing CATALOGS: rc = %d\n", rc);