* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2012, Intel, Inc.
+ * Copyright (c) 2012, 2013, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#define DEBUG_SUBSYSTEM S_LOG
-#ifndef EXPORT_SYMTAB
-#define EXPORT_SYMTAB
-#endif
-
#include <obd.h>
#include <obd_class.h>
#include <lustre_fid.h>
CDEBUG(D_OTHER, "write blob with type %x, buf %p/%u at off %llu\n",
rec->lrh_type, buf, buflen, *off);
+ lgi->lgi_attr.la_valid = LA_SIZE;
+ lgi->lgi_attr.la_size = *off;
+
if (!buf) {
lgi->lgi_buf.lb_len = buflen;
lgi->lgi_buf.lb_buf = rec;
if (rc)
CERROR("%s: error writing log record: rc = %d\n",
o->do_lu.lo_dev->ld_obd->obd_name, rc);
- RETURN(rc);
+ GOTO(out, rc);
}
/* the buf case */
if (rc) {
CERROR("%s: error writing log hdr: rc = %d\n",
o->do_lu.lo_dev->ld_obd->obd_name, rc);
- GOTO(out, rc);
+ GOTO(out_unlock, rc);
}
lgi->lgi_buf.lb_len = buflen;
if (rc) {
CERROR("%s: error writing log buffer: rc = %d\n",
o->do_lu.lo_dev->ld_obd->obd_name, rc);
- GOTO(out, rc);
+ GOTO(out_unlock, rc);
}
lgi->lgi_tail.lrt_len = rec->lrh_len;
if (rc)
CERROR("%s: error writing log tail: rc = %d\n",
o->do_lu.lo_dev->ld_obd->obd_name, rc);
-out:
+
+out_unlock:
dt_write_unlock(env, o);
+
+out:
+ /* cleanup the content written above */
+ if (rc) {
+ dt_punch(env, o, lgi->lgi_attr.la_size, OBD_OBJECT_EOF, th,
+ BYPASS_CAPA);
+ dt_attr_set(env, o, &lgi->lgi_attr, th, BYPASS_CAPA);
+ }
+
RETURN(rc);
}
LASSERT(env);
LASSERT(th);
LASSERT(loghandle);
+ LASSERT(rec);
+ LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE);
o = loghandle->lgh_obj;
LASSERT(o);
+ lgi->lgi_buf.lb_len = sizeof(struct llog_log_hdr);
+ lgi->lgi_buf.lb_buf = NULL;
/* each time we update header */
- rc = dt_declare_record_write(env, o, sizeof(struct llog_log_hdr), 0,
+ rc = dt_declare_record_write(env, o, &lgi->lgi_buf, 0,
th);
if (rc || idx == 0) /* if error or just header */
RETURN(rc);
LASSERT(ergo(rc == 0, lgi->lgi_attr.la_valid & LA_SIZE));
if (rc)
RETURN(rc);
+
+ rc = dt_declare_punch(env, o, lgi->lgi_off, OBD_OBJECT_EOF, th);
+ if (rc)
+ RETURN(rc);
} else {
lgi->lgi_off = 0;
}
+ lgi->lgi_buf.lb_len = rec->lrh_len;
+ lgi->lgi_buf.lb_buf = NULL;
/* XXX: implement declared window or multi-chunks approach */
- rc = dt_declare_record_write(env, o, 32 * 1024, lgi->lgi_off, th);
+ rc = dt_declare_record_write(env, o, &lgi->lgi_buf, -1, th);
RETURN(rc);
}
struct llog_thread_info *lgi = llog_info(env);
struct llog_log_hdr *llh;
int reclen = rec->lrh_len;
- int index, rc;
+ int index, rc, old_tail_idx;
struct llog_rec_tail *lrt;
struct dt_object *o;
size_t left;
/* We assume that caller has set lgh_cur_* */
lgi->lgi_off = loghandle->lgh_cur_offset;
CDEBUG(D_OTHER,
- "modify record "LPX64": idx:%d/%u/%d, len:%u "
+ "modify record "DOSTID": idx:%d/%u/%d, len:%u "
"offset %llu\n",
- loghandle->lgh_id.lgl_oid, idx, rec->lrh_index,
+ POSTID(&loghandle->lgh_id.lgl_oi), idx,
+ rec->lrh_index,
loghandle->lgh_cur_idx, rec->lrh_len,
(long long)(lgi->lgi_off - sizeof(*llh)));
if (rec->lrh_index != loghandle->lgh_cur_idx) {
/* The caller should make sure only 1 process access the lgh_last_idx,
* Otherwise it might hit the assert.*/
LASSERT(index < LLOG_BITMAP_SIZE(llh));
- cfs_spin_lock(&loghandle->lgh_hdr_lock);
+ spin_lock(&loghandle->lgh_hdr_lock);
if (ext2_set_bit(index, llh->llh_bitmap)) {
CERROR("%s: index %u already set in log bitmap\n",
o->do_lu.lo_dev->ld_obd->obd_name, index);
- cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+ spin_unlock(&loghandle->lgh_hdr_lock);
LBUG(); /* should never happen */
}
llh->llh_count++;
- cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+ spin_unlock(&loghandle->lgh_hdr_lock);
+ old_tail_idx = llh->llh_tail.lrt_index;
llh->llh_tail.lrt_index = index;
lgi->lgi_off = 0;
rc = llog_osd_write_blob(env, o, &llh->llh_hdr, NULL, &lgi->lgi_off,
th);
if (rc)
- RETURN(rc);
+ GOTO(out, rc);
rc = dt_attr_get(env, o, &lgi->lgi_attr, NULL);
if (rc)
- RETURN(rc);
+ GOTO(out, rc);
+
LASSERT(lgi->lgi_attr.la_valid & LA_SIZE);
lgi->lgi_off = lgi->lgi_attr.la_size;
rc = llog_osd_write_blob(env, o, rec, buf, &lgi->lgi_off, th);
- if (rc)
- RETURN(rc);
- CDEBUG(D_RPCTRACE, "added record "LPX64": idx: %u, %u\n",
- loghandle->lgh_id.lgl_oid, index, rec->lrh_len);
+out:
+ /* cleanup llog for error case */
+ if (rc) {
+ spin_lock(&loghandle->lgh_hdr_lock);
+ ext2_clear_bit(index, llh->llh_bitmap);
+ llh->llh_count--;
+ spin_unlock(&loghandle->lgh_hdr_lock);
+
+ /* restore the header */
+ loghandle->lgh_last_idx--;
+ llh->llh_tail.lrt_index = old_tail_idx;
+ lgi->lgi_off = 0;
+ llog_osd_write_blob(env, o, &llh->llh_hdr, NULL,
+ &lgi->lgi_off, th);
+ }
+
+ CDEBUG(D_RPCTRACE, "added record "DOSTID": idx: %u, %u\n",
+ POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len);
if (rc == 0 && reccookie) {
reccookie->lgc_lgl = loghandle->lgh_id;
reccookie->lgc_index = index;
GOTO(out, rc);
if (rc < sizeof(*tail)) {
- CERROR("%s: invalid llog block at log id "LPU64"/%u "
+ CERROR("%s: invalid llog block at log id "DOSTID"/%u "
"offset "LPU64"\n",
o->do_lu.lo_dev->ld_obd->obd_name,
- loghandle->lgh_id.lgl_oid,
+ POSTID(&loghandle->lgh_id.lgl_oi),
loghandle->lgh_id.lgl_ogen, *cur_offset);
GOTO(out, rc = -EINVAL);
}
/* this shouldn't happen */
if (tail->lrt_index == 0) {
- CERROR("%s: invalid llog tail at log id "LPU64"/%u "
+ CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
"offset "LPU64"\n",
o->do_lu.lo_dev->ld_obd->obd_name,
- loghandle->lgh_id.lgl_oid,
+ POSTID(&loghandle->lgh_id.lgl_oi),
loghandle->lgh_id.lgl_ogen, *cur_offset);
GOTO(out, rc = -EINVAL);
}
GOTO(out, rc);
if (rc < sizeof(*tail)) {
- CERROR("%s: invalid llog block at log id "LPU64"/%u "
+ CERROR("%s: invalid llog block at log id "DOSTID"/%u "
"offset "LPU64"\n",
o->do_lu.lo_dev->ld_obd->obd_name,
- loghandle->lgh_id.lgl_oid,
+ POSTID(&loghandle->lgh_id.lgl_oi),
loghandle->lgh_id.lgl_ogen, cur_offset);
GOTO(out, rc = -EINVAL);
}
/* this shouldn't happen */
if (tail->lrt_index == 0) {
- CERROR("%s: invalid llog tail at log id "LPU64"/%u "
+ CERROR("%s: invalid llog tail at log id "DOSTID"/%u "
"offset "LPU64"\n",
o->do_lu.lo_dev->ld_obd->obd_name,
- loghandle->lgh_id.lgl_oid,
+ POSTID(&loghandle->lgh_id.lgl_oi),
loghandle->lgh_id.lgl_ogen, cur_offset);
GOTO(out, rc = -EINVAL);
}
if (rc)
return ERR_PTR(rc);
dir = dt_locate(env, dt, &dti->dti_fid);
+
+ if (!IS_ERR(dir) && !dt_try_as_dir(env, dir)) {
+ lu_object_put(env, &dir->do_lu);
+ return ERR_PTR(-ENOTDIR);
+ }
} else {
lu_object_get(&ctxt->loc_dir->do_lu);
dir = ctxt->loc_dir;
if (IS_ERR(ls))
RETURN(PTR_ERR(ls));
- cfs_mutex_lock(&ls->ls_los_mutex);
- los = dt_los_find(ls, FID_SEQ_LLOG);
- cfs_mutex_unlock(&ls->ls_los_mutex);
+ mutex_lock(&ls->ls_los_mutex);
+ los = dt_los_find(ls, name != NULL ? FID_SEQ_LLOG_NAME : FID_SEQ_LLOG);
+ mutex_unlock(&ls->ls_los_mutex);
LASSERT(los);
ls_device_put(env, ls);
GOTO(out, rc);
}
- o = ls_locate(env, ls, &lgi->lgi_fid);
+ o = ls_locate(env, ls, &lgi->lgi_fid, NULL);
if (IS_ERR(o))
GOTO(out_name, rc = PTR_ERR(o));
if (rc)
RETURN(rc);
- rc = dt_declare_record_write(env, o, LLOG_CHUNK_SIZE, 0, th);
- if (rc)
- RETURN(rc);
+ /* do not declare header initialization here as it's declared
+ * in llog_osd_declare_write_rec() which is always called */
if (res->lgh_name) {
struct dt_object *llog_dir;
llog_dir = llog_osd_dir_get(env, res->lgh_ctxt);
if (IS_ERR(llog_dir))
RETURN(PTR_ERR(llog_dir));
- dt_declare_ref_add(env, o, th);
logid_to_fid(&res->lgh_id, &lgi->lgi_fid);
rc = dt_declare_insert(env, llog_dir,
(struct dt_rec *)&lgi->lgi_fid,
rc = llog_osd_create_new_object(env, los, o, th);
else
rc = -EEXIST;
- if (res->lgh_name)
- dt_ref_add(env, o, th);
+
dt_write_unlock(env, o);
if (rc)
RETURN(rc);
static int llog_osd_destroy(const struct lu_env *env,
struct llog_handle *loghandle)
{
- struct llog_thread_info *lgi = llog_info(env);
struct llog_ctxt *ctxt;
struct dt_object *o, *llog_dir = NULL;
struct dt_device *d;
if (IS_ERR(llog_dir))
GOTO(out_trans, rc = PTR_ERR(llog_dir));
- dt_declare_ref_del(env, o, th);
name = loghandle->lgh_name;
rc = dt_declare_delete(env, llog_dir,
(struct dt_key *)name, th);
dt_write_lock(env, o, 0);
if (dt_object_exists(o)) {
if (name) {
- dt_ref_del(env, o, th);
dt_read_lock(env, llog_dir, 0);
rc = dt_delete(env, llog_dir,
(struct dt_key *) name,
GOTO(out_unlock, rc);
}
}
- /*
- * XXX: compatibility bits
- * on old filesystems llogs are referenced by the name
- * on the new ones they are referenced by OI and by
- * the name
- */
- rc = dt_attr_get(env, o, &lgi->lgi_attr, NULL);
- if (rc)
- GOTO(out_unlock, rc);
- LASSERT(lgi->lgi_attr.la_nlink < 2);
- if (lgi->lgi_attr.la_nlink == 1)
- dt_ref_del(env, o, th);
+ dt_ref_del(env, o, th);
rc = dt_destroy(env, o, th);
if (rc)
GOTO(out_unlock, rc);
struct obd_llog_group *olg, int ctxt_idx,
struct obd_device *disk_obd)
{
- struct local_oid_storage *los;
struct llog_thread_info *lgi = llog_info(env);
struct llog_ctxt *ctxt;
int rc = 0;
-
ENTRY;
LASSERT(obd);
lgi->lgi_fid.f_oid = 1;
lgi->lgi_fid.f_ver = 0;
rc = local_oid_storage_init(env, disk_obd->obd_lvfs_ctxt.dt,
- &lgi->lgi_fid, &los);
+ &lgi->lgi_fid,
+ &ctxt->loc_los_nameless);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ lgi->lgi_fid.f_seq = FID_SEQ_LLOG_NAME;
+ lgi->lgi_fid.f_oid = 1;
+ lgi->lgi_fid.f_ver = 0;
+ rc = local_oid_storage_init(env, disk_obd->obd_lvfs_ctxt.dt,
+ &lgi->lgi_fid,
+ &ctxt->loc_los_named);
+ if (rc != 0) {
+ local_oid_storage_fini(env, ctxt->loc_los_nameless);
+ ctxt->loc_los_nameless = NULL;
+ }
+
+ GOTO(out, rc);
+
+out:
llog_ctxt_put(ctxt);
return rc;
}
static int llog_osd_cleanup(const struct lu_env *env, struct llog_ctxt *ctxt)
{
- struct dt_device *dt;
- struct ls_device *ls;
- struct local_oid_storage *los;
-
- LASSERT(ctxt->loc_exp->exp_obd);
- dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt;
- ls = ls_device_get(dt);
- if (IS_ERR(ls))
- RETURN(PTR_ERR(ls));
+ if (ctxt->loc_los_nameless != NULL) {
+ local_oid_storage_fini(env, ctxt->loc_los_nameless);
+ ctxt->loc_los_nameless = NULL;
+ }
- cfs_mutex_lock(&ls->ls_los_mutex);
- los = dt_los_find(ls, FID_SEQ_LLOG);
- cfs_mutex_unlock(&ls->ls_los_mutex);
- if (los != NULL) {
- dt_los_put(los);
- local_oid_storage_fini(env, los);
+ if (ctxt->loc_los_named != NULL) {
+ local_oid_storage_fini(env, ctxt->loc_los_named);
+ ctxt->loc_los_named = NULL;
}
- ls_device_put(env, ls);
+
return 0;
}
/* reads the catalog list */
int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d,
- int idx, int count, struct llog_catid *idarray)
+ int idx, int count, struct llog_catid *idarray,
+ const struct lu_fid *fid)
{
struct llog_thread_info *lgi = llog_info(env);
struct dt_object *o = NULL;
size = sizeof(*idarray) * count;
lgi->lgi_off = idx * sizeof(*idarray);
- lu_local_obj_fid(&lgi->lgi_fid, LLOG_CATALOGS_OID);
+ lgi->lgi_fid = *fid;
o = dt_locate(env, d, &lgi->lgi_fid);
if (IS_ERR(o))
RETURN(PTR_ERR(o));
/* writes the cat list */
int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d,
- int idx, int count, struct llog_catid *idarray)
+ int idx, int count, struct llog_catid *idarray,
+ const struct lu_fid *fid)
{
struct llog_thread_info *lgi = llog_info(env);
struct dt_object *o = NULL;
size = sizeof(*idarray) * count;
lgi->lgi_off = idx * sizeof(*idarray);
+ lgi->lgi_fid = *fid;
- lu_local_obj_fid(&lgi->lgi_fid, LLOG_CATALOGS_OID);
o = dt_locate(env, d, &lgi->lgi_fid);
if (IS_ERR(o))
RETURN(PTR_ERR(o));
if (IS_ERR(th))
GOTO(out, rc = PTR_ERR(th));
- rc = dt_declare_record_write(env, o, size, lgi->lgi_off, th);
+ lgi->lgi_buf.lb_len = size;
+ lgi->lgi_buf.lb_buf = idarray;
+ rc = dt_declare_record_write(env, o, &lgi->lgi_buf, lgi->lgi_off, th);
if (rc)
GOTO(out, rc);
if (rc)
GOTO(out_trans, rc);
- lgi->lgi_buf.lb_buf = idarray;
- lgi->lgi_buf.lb_len = size;
rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
if (rc)
CDEBUG(D_INODE, "error writeing CATALOGS: rc = %d\n", rc);