X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fobdclass%2Fllog_osd.c;h=fd9bfc360af298129b74a19eb44718eac7f3d52e;hb=a77dd45edb772259813bcb44ba7bf7a2bf4a35ed;hp=80904c51532d70371ff7de2bf3969e6e7c79390d;hpb=6c37a25e0084143a2c5e00f765a084306df90167;p=fs%2Flustre-release.git diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index 80904c5..fd9bfc36 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -27,7 +27,7 @@ * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2011, 2012, Intel, Inc. + * Copyright (c) 2012, 2013, Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ @@ -41,10 +41,6 @@ #define DEBUG_SUBSYSTEM S_LOG -#ifndef EXPORT_SYMTAB -#define EXPORT_SYMTAB -#endif - #include #include #include @@ -148,6 +144,9 @@ static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o, CDEBUG(D_OTHER, "write blob with type %x, buf %p/%u at off %llu\n", rec->lrh_type, buf, buflen, *off); + lgi->lgi_attr.la_valid = LA_SIZE; + lgi->lgi_attr.la_size = *off; + if (!buf) { lgi->lgi_buf.lb_len = buflen; lgi->lgi_buf.lb_buf = rec; @@ -155,7 +154,7 @@ static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o, if (rc) CERROR("%s: error writing log record: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, rc); - RETURN(rc); + GOTO(out, rc); } /* the buf case */ @@ -168,7 +167,7 @@ static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o, if (rc) { CERROR("%s: error writing log hdr: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, rc); - GOTO(out, rc); + GOTO(out_unlock, rc); } lgi->lgi_buf.lb_len = buflen; @@ -177,7 +176,7 @@ static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o, if (rc) { CERROR("%s: error writing log buffer: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, rc); - GOTO(out, rc); + GOTO(out_unlock, rc); } lgi->lgi_tail.lrt_len = rec->lrh_len; @@ -188,8 +187,18 @@ static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o, if (rc) CERROR("%s: error writing log tail: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, rc); -out: + +out_unlock: dt_write_unlock(env, o); + +out: + /* cleanup the content written above */ + if (rc) { + dt_punch(env, o, lgi->lgi_attr.la_size, OBD_OBJECT_EOF, th, + BYPASS_CAPA); + dt_attr_set(env, o, &lgi->lgi_attr, th, BYPASS_CAPA); + } + RETURN(rc); } @@ -274,12 +283,16 @@ static int llog_osd_declare_write_rec(const struct lu_env *env, LASSERT(env); LASSERT(th); LASSERT(loghandle); + LASSERT(rec); + LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE); o = loghandle->lgh_obj; LASSERT(o); + lgi->lgi_buf.lb_len = sizeof(struct llog_log_hdr); + lgi->lgi_buf.lb_buf = NULL; /* each time we update header */ - rc = dt_declare_record_write(env, o, sizeof(struct llog_log_hdr), 0, + rc = dt_declare_record_write(env, o, &lgi->lgi_buf, 0, th); if (rc || idx == 0) /* if error or just header */ RETURN(rc); @@ -290,12 +303,18 @@ static int llog_osd_declare_write_rec(const struct lu_env *env, LASSERT(ergo(rc == 0, lgi->lgi_attr.la_valid & LA_SIZE)); if (rc) RETURN(rc); + + rc = dt_declare_punch(env, o, lgi->lgi_off, OBD_OBJECT_EOF, th); + if (rc) + RETURN(rc); } else { lgi->lgi_off = 0; } + lgi->lgi_buf.lb_len = rec->lrh_len; + lgi->lgi_buf.lb_buf = NULL; /* XXX: implement declared window or multi-chunks approach */ - rc = dt_declare_record_write(env, o, 32 * 1024, lgi->lgi_off, th); + rc = dt_declare_record_write(env, o, &lgi->lgi_buf, -1, th); RETURN(rc); } @@ -311,7 +330,7 @@ static int llog_osd_write_rec(const struct lu_env *env, struct llog_thread_info *lgi = llog_info(env); struct llog_log_hdr *llh; int reclen = rec->lrh_len; - int index, rc; + int index, rc, old_tail_idx; struct llog_rec_tail *lrt; struct dt_object *o; size_t left; @@ -373,9 +392,10 @@ static int llog_osd_write_rec(const struct lu_env *env, /* We assume that caller has set lgh_cur_* */ lgi->lgi_off = loghandle->lgh_cur_offset; CDEBUG(D_OTHER, - "modify record "LPX64": idx:%d/%u/%d, len:%u " + "modify record "DOSTID": idx:%d/%u/%d, len:%u " "offset %llu\n", - loghandle->lgh_id.lgl_oid, idx, rec->lrh_index, + POSTID(&loghandle->lgh_id.lgl_oi), idx, + rec->lrh_index, loghandle->lgh_cur_idx, rec->lrh_len, (long long)(lgi->lgi_off - sizeof(*llh))); if (rec->lrh_index != loghandle->lgh_cur_idx) { @@ -434,35 +454,51 @@ static int llog_osd_write_rec(const struct lu_env *env, /* The caller should make sure only 1 process access the lgh_last_idx, * Otherwise it might hit the assert.*/ LASSERT(index < LLOG_BITMAP_SIZE(llh)); - cfs_spin_lock(&loghandle->lgh_hdr_lock); + spin_lock(&loghandle->lgh_hdr_lock); if (ext2_set_bit(index, llh->llh_bitmap)) { CERROR("%s: index %u already set in log bitmap\n", o->do_lu.lo_dev->ld_obd->obd_name, index); - cfs_spin_unlock(&loghandle->lgh_hdr_lock); + spin_unlock(&loghandle->lgh_hdr_lock); LBUG(); /* should never happen */ } llh->llh_count++; - cfs_spin_unlock(&loghandle->lgh_hdr_lock); + spin_unlock(&loghandle->lgh_hdr_lock); + old_tail_idx = llh->llh_tail.lrt_index; llh->llh_tail.lrt_index = index; lgi->lgi_off = 0; rc = llog_osd_write_blob(env, o, &llh->llh_hdr, NULL, &lgi->lgi_off, th); if (rc) - RETURN(rc); + GOTO(out, rc); rc = dt_attr_get(env, o, &lgi->lgi_attr, NULL); if (rc) - RETURN(rc); + GOTO(out, rc); + LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); lgi->lgi_off = lgi->lgi_attr.la_size; rc = llog_osd_write_blob(env, o, rec, buf, &lgi->lgi_off, th); - if (rc) - RETURN(rc); - CDEBUG(D_RPCTRACE, "added record "LPX64": idx: %u, %u\n", - loghandle->lgh_id.lgl_oid, index, rec->lrh_len); +out: + /* cleanup llog for error case */ + if (rc) { + spin_lock(&loghandle->lgh_hdr_lock); + ext2_clear_bit(index, llh->llh_bitmap); + llh->llh_count--; + spin_unlock(&loghandle->lgh_hdr_lock); + + /* restore the header */ + loghandle->lgh_last_idx--; + llh->llh_tail.lrt_index = old_tail_idx; + lgi->lgi_off = 0; + llog_osd_write_blob(env, o, &llh->llh_hdr, NULL, + &lgi->lgi_off, th); + } + + CDEBUG(D_RPCTRACE, "added record "DOSTID": idx: %u, %u\n", + POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len); if (rc == 0 && reccookie) { reccookie->lgc_lgl = loghandle->lgh_id; reccookie->lgc_index = index; @@ -573,10 +609,10 @@ static int llog_osd_next_block(const struct lu_env *env, GOTO(out, rc); if (rc < sizeof(*tail)) { - CERROR("%s: invalid llog block at log id "LPU64"/%u " + CERROR("%s: invalid llog block at log id "DOSTID"/%u " "offset "LPU64"\n", o->do_lu.lo_dev->ld_obd->obd_name, - loghandle->lgh_id.lgl_oid, + POSTID(&loghandle->lgh_id.lgl_oi), loghandle->lgh_id.lgl_ogen, *cur_offset); GOTO(out, rc = -EINVAL); } @@ -599,10 +635,10 @@ static int llog_osd_next_block(const struct lu_env *env, /* this shouldn't happen */ if (tail->lrt_index == 0) { - CERROR("%s: invalid llog tail at log id "LPU64"/%u " + CERROR("%s: invalid llog tail at log id "DOSTID"/%u " "offset "LPU64"\n", o->do_lu.lo_dev->ld_obd->obd_name, - loghandle->lgh_id.lgl_oid, + POSTID(&loghandle->lgh_id.lgl_oi), loghandle->lgh_id.lgl_ogen, *cur_offset); GOTO(out, rc = -EINVAL); } @@ -681,10 +717,10 @@ static int llog_osd_prev_block(const struct lu_env *env, GOTO(out, rc); if (rc < sizeof(*tail)) { - CERROR("%s: invalid llog block at log id "LPU64"/%u " + CERROR("%s: invalid llog block at log id "DOSTID"/%u " "offset "LPU64"\n", o->do_lu.lo_dev->ld_obd->obd_name, - loghandle->lgh_id.lgl_oid, + POSTID(&loghandle->lgh_id.lgl_oi), loghandle->lgh_id.lgl_ogen, cur_offset); GOTO(out, rc = -EINVAL); } @@ -705,10 +741,10 @@ static int llog_osd_prev_block(const struct lu_env *env, /* this shouldn't happen */ if (tail->lrt_index == 0) { - CERROR("%s: invalid llog tail at log id "LPU64"/%u " + CERROR("%s: invalid llog tail at log id "DOSTID"/%u " "offset "LPU64"\n", o->do_lu.lo_dev->ld_obd->obd_name, - loghandle->lgh_id.lgl_oid, + POSTID(&loghandle->lgh_id.lgl_oi), loghandle->lgh_id.lgl_ogen, cur_offset); GOTO(out, rc = -EINVAL); } @@ -744,6 +780,11 @@ struct dt_object *llog_osd_dir_get(const struct lu_env *env, if (rc) return ERR_PTR(rc); dir = dt_locate(env, dt, &dti->dti_fid); + + if (!IS_ERR(dir) && !dt_try_as_dir(env, dir)) { + lu_object_put(env, &dir->do_lu); + return ERR_PTR(-ENOTDIR); + } } else { lu_object_get(&ctxt->loc_dir->do_lu); dir = ctxt->loc_dir; @@ -777,9 +818,9 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle, if (IS_ERR(ls)) RETURN(PTR_ERR(ls)); - cfs_mutex_lock(&ls->ls_los_mutex); - los = dt_los_find(ls, FID_SEQ_LLOG); - cfs_mutex_unlock(&ls->ls_los_mutex); + mutex_lock(&ls->ls_los_mutex); + los = dt_los_find(ls, name != NULL ? FID_SEQ_LLOG_NAME : FID_SEQ_LLOG); + mutex_unlock(&ls->ls_los_mutex); LASSERT(los); ls_device_put(env, ls); @@ -817,7 +858,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle, GOTO(out, rc); } - o = ls_locate(env, ls, &lgi->lgi_fid); + o = ls_locate(env, ls, &lgi->lgi_fid, NULL); if (IS_ERR(o)) GOTO(out_name, rc = PTR_ERR(o)); @@ -874,9 +915,8 @@ static int llog_osd_declare_create(const struct lu_env *env, if (rc) RETURN(rc); - rc = dt_declare_record_write(env, o, LLOG_CHUNK_SIZE, 0, th); - if (rc) - RETURN(rc); + /* do not declare header initialization here as it's declared + * in llog_osd_declare_write_rec() which is always called */ if (res->lgh_name) { struct dt_object *llog_dir; @@ -884,7 +924,6 @@ static int llog_osd_declare_create(const struct lu_env *env, llog_dir = llog_osd_dir_get(env, res->lgh_ctxt); if (IS_ERR(llog_dir)) RETURN(PTR_ERR(llog_dir)); - dt_declare_ref_add(env, o, th); logid_to_fid(&res->lgh_id, &lgi->lgi_fid); rc = dt_declare_insert(env, llog_dir, (struct dt_rec *)&lgi->lgi_fid, @@ -926,8 +965,7 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res, rc = llog_osd_create_new_object(env, los, o, th); else rc = -EEXIST; - if (res->lgh_name) - dt_ref_add(env, o, th); + dt_write_unlock(env, o); if (rc) RETURN(rc); @@ -979,7 +1017,6 @@ static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle) static int llog_osd_destroy(const struct lu_env *env, struct llog_handle *loghandle) { - struct llog_thread_info *lgi = llog_info(env); struct llog_ctxt *ctxt; struct dt_object *o, *llog_dir = NULL; struct dt_device *d; @@ -1008,7 +1045,6 @@ static int llog_osd_destroy(const struct lu_env *env, if (IS_ERR(llog_dir)) GOTO(out_trans, rc = PTR_ERR(llog_dir)); - dt_declare_ref_del(env, o, th); name = loghandle->lgh_name; rc = dt_declare_delete(env, llog_dir, (struct dt_key *)name, th); @@ -1029,7 +1065,6 @@ static int llog_osd_destroy(const struct lu_env *env, dt_write_lock(env, o, 0); if (dt_object_exists(o)) { if (name) { - dt_ref_del(env, o, th); dt_read_lock(env, llog_dir, 0); rc = dt_delete(env, llog_dir, (struct dt_key *) name, @@ -1042,18 +1077,7 @@ static int llog_osd_destroy(const struct lu_env *env, GOTO(out_unlock, rc); } } - /* - * XXX: compatibility bits - * on old filesystems llogs are referenced by the name - * on the new ones they are referenced by OI and by - * the name - */ - rc = dt_attr_get(env, o, &lgi->lgi_attr, NULL); - if (rc) - GOTO(out_unlock, rc); - LASSERT(lgi->lgi_attr.la_nlink < 2); - if (lgi->lgi_attr.la_nlink == 1) - dt_ref_del(env, o, th); + dt_ref_del(env, o, th); rc = dt_destroy(env, o, th); if (rc) GOTO(out_unlock, rc); @@ -1071,11 +1095,9 @@ static int llog_osd_setup(const struct lu_env *env, struct obd_device *obd, struct obd_llog_group *olg, int ctxt_idx, struct obd_device *disk_obd) { - struct local_oid_storage *los; struct llog_thread_info *lgi = llog_info(env); struct llog_ctxt *ctxt; int rc = 0; - ENTRY; LASSERT(obd); @@ -1090,31 +1112,41 @@ static int llog_osd_setup(const struct lu_env *env, struct obd_device *obd, lgi->lgi_fid.f_oid = 1; lgi->lgi_fid.f_ver = 0; rc = local_oid_storage_init(env, disk_obd->obd_lvfs_ctxt.dt, - &lgi->lgi_fid, &los); + &lgi->lgi_fid, + &ctxt->loc_los_nameless); + if (rc != 0) + GOTO(out, rc); + + lgi->lgi_fid.f_seq = FID_SEQ_LLOG_NAME; + lgi->lgi_fid.f_oid = 1; + lgi->lgi_fid.f_ver = 0; + rc = local_oid_storage_init(env, disk_obd->obd_lvfs_ctxt.dt, + &lgi->lgi_fid, + &ctxt->loc_los_named); + if (rc != 0) { + local_oid_storage_fini(env, ctxt->loc_los_nameless); + ctxt->loc_los_nameless = NULL; + } + + GOTO(out, rc); + +out: llog_ctxt_put(ctxt); return rc; } static int llog_osd_cleanup(const struct lu_env *env, struct llog_ctxt *ctxt) { - struct dt_device *dt; - struct ls_device *ls; - struct local_oid_storage *los; - - LASSERT(ctxt->loc_exp->exp_obd); - dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt; - ls = ls_device_get(dt); - if (IS_ERR(ls)) - RETURN(PTR_ERR(ls)); + if (ctxt->loc_los_nameless != NULL) { + local_oid_storage_fini(env, ctxt->loc_los_nameless); + ctxt->loc_los_nameless = NULL; + } - cfs_mutex_lock(&ls->ls_los_mutex); - los = dt_los_find(ls, FID_SEQ_LLOG); - cfs_mutex_unlock(&ls->ls_los_mutex); - if (los != NULL) { - dt_los_put(los); - local_oid_storage_fini(env, los); + if (ctxt->loc_los_named != NULL) { + local_oid_storage_fini(env, ctxt->loc_los_named); + ctxt->loc_los_named = NULL; } - ls_device_put(env, ls); + return 0; } @@ -1137,7 +1169,8 @@ EXPORT_SYMBOL(llog_osd_ops); /* reads the catalog list */ int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d, - int idx, int count, struct llog_catid *idarray) + int idx, int count, struct llog_catid *idarray, + const struct lu_fid *fid) { struct llog_thread_info *lgi = llog_info(env); struct dt_object *o = NULL; @@ -1151,7 +1184,7 @@ int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d, size = sizeof(*idarray) * count; lgi->lgi_off = idx * sizeof(*idarray); - lu_local_obj_fid(&lgi->lgi_fid, LLOG_CATALOGS_OID); + lgi->lgi_fid = *fid; o = dt_locate(env, d, &lgi->lgi_fid); if (IS_ERR(o)) RETURN(PTR_ERR(o)); @@ -1230,7 +1263,8 @@ EXPORT_SYMBOL(llog_osd_get_cat_list); /* writes the cat list */ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d, - int idx, int count, struct llog_catid *idarray) + int idx, int count, struct llog_catid *idarray, + const struct lu_fid *fid) { struct llog_thread_info *lgi = llog_info(env); struct dt_object *o = NULL; @@ -1244,8 +1278,8 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d, size = sizeof(*idarray) * count; lgi->lgi_off = idx * sizeof(*idarray); + lgi->lgi_fid = *fid; - lu_local_obj_fid(&lgi->lgi_fid, LLOG_CATALOGS_OID); o = dt_locate(env, d, &lgi->lgi_fid); if (IS_ERR(o)) RETURN(PTR_ERR(o)); @@ -1268,7 +1302,9 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d, if (IS_ERR(th)) GOTO(out, rc = PTR_ERR(th)); - rc = dt_declare_record_write(env, o, size, lgi->lgi_off, th); + lgi->lgi_buf.lb_len = size; + lgi->lgi_buf.lb_buf = idarray; + rc = dt_declare_record_write(env, o, &lgi->lgi_buf, lgi->lgi_off, th); if (rc) GOTO(out, rc); @@ -1276,8 +1312,6 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d, if (rc) GOTO(out_trans, rc); - lgi->lgi_buf.lb_buf = idarray; - lgi->lgi_buf.lb_len = size; rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc) CDEBUG(D_INODE, "error writeing CATALOGS: rc = %d\n", rc);