* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2011, 2012, Intel, Inc.
+ * Copyright (c) 2012, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
lgi->lgi_buf.lb_buf = &lgi->lgi_lrh;
lgi->lgi_buf.lb_len = sizeof(lgi->lgi_lrh);
+ dt_write_lock(env, o, 0);
rc = dt_record_write(env, o, &lgi->lgi_buf, off, th);
if (rc) {
CERROR("%s: error writing padding record: rc = %d\n",
o->do_lu.lo_dev->ld_obd->obd_name, rc);
- RETURN(rc);
+ GOTO(out, rc);
}
lgi->lgi_buf.lb_buf = &lgi->lgi_tail;
if (rc)
CERROR("%s: error writing padding record: rc = %d\n",
o->do_lu.lo_dev->ld_obd->obd_name, rc);
-
+out:
+ dt_write_unlock(env, o);
RETURN(rc);
}
}
/* the buf case */
+ /* protect the following 3 writes from concurrent read */
+ dt_write_lock(env, o, 0);
rec->lrh_len = sizeof(*rec) + buflen + sizeof(lgi->lgi_tail);
lgi->lgi_buf.lb_len = sizeof(*rec);
lgi->lgi_buf.lb_buf = rec;
CERROR("%s: error writing log tail: rc = %d\n",
o->do_lu.lo_dev->ld_obd->obd_name, rc);
out:
+ dt_write_unlock(env, o);
RETURN(rc);
}
/* The caller should make sure only 1 process access the lgh_last_idx,
* Otherwise it might hit the assert.*/
LASSERT(index < LLOG_BITMAP_SIZE(llh));
- cfs_spin_lock(&loghandle->lgh_hdr_lock);
+ spin_lock(&loghandle->lgh_hdr_lock);
if (ext2_set_bit(index, llh->llh_bitmap)) {
CERROR("%s: index %u already set in log bitmap\n",
o->do_lu.lo_dev->ld_obd->obd_name, index);
- cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+ spin_unlock(&loghandle->lgh_hdr_lock);
LBUG(); /* should never happen */
}
llh->llh_count++;
- cfs_spin_unlock(&loghandle->lgh_hdr_lock);
+ spin_unlock(&loghandle->lgh_hdr_lock);
llh->llh_tail.lrt_index = index;
lgi->lgi_off = 0;
lgi->lgi_buf.lb_len = LLOG_CHUNK_SIZE -
(*cur_offset & (LLOG_CHUNK_SIZE - 1));
lgi->lgi_buf.lb_buf = buf;
+
+ /* Note: read lock is not needed around la_size get above at
+ * the time of dt_attr_get(). There are only two cases that
+ * matter. Either la_size == cur_offset, in which case the
+ * entire read is skipped, or la_size > cur_offset and the loop
+ * is entered and this thread is blocked at dt_read_lock()
+ * until the write is completed. When the write completes, then
+ * the dt_read() will be done with the full length, and will
+ * get the full data.
+ */
+ dt_read_lock(env, o, 0);
rc = dt_read(env, o, &lgi->lgi_buf, cur_offset);
+ dt_read_unlock(env, o);
if (rc < 0) {
CERROR("%s: can't read llog block from log "DFID
" offset "LPU64": rc = %d\n",
lgi->lgi_buf.lb_len = len;
lgi->lgi_buf.lb_buf = buf;
+ /* It is OK to have locking around dt_read() only, see
+ * comment in llog_osd_next_block for details
+ */
+ dt_read_lock(env, o, 0);
rc = dt_read(env, o, &lgi->lgi_buf, &cur_offset);
+ dt_read_unlock(env, o);
if (rc < 0) {
CERROR("%s: can't read llog block from log "DFID
" offset "LPU64": rc = %d\n",
return rc;
}
+struct dt_object *llog_osd_dir_get(const struct lu_env *env,
+ struct llog_ctxt *ctxt)
+{
+ struct dt_device *dt;
+ struct dt_thread_info *dti = dt_info(env);
+ struct dt_object *dir;
+ int rc;
+
+ dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt;
+ if (ctxt->loc_dir == NULL) {
+ rc = dt_root_get(env, dt, &dti->dti_fid);
+ if (rc)
+ return ERR_PTR(rc);
+ dir = dt_locate(env, dt, &dti->dti_fid);
+ } else {
+ lu_object_get(&ctxt->loc_dir->do_lu);
+ dir = ctxt->loc_dir;
+ }
+
+ return dir;
+}
+
static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle,
struct llog_logid *logid, char *name,
enum llog_open_param open_param)
if (IS_ERR(ls))
RETURN(PTR_ERR(ls));
- cfs_mutex_lock(&ls->ls_los_mutex);
+ mutex_lock(&ls->ls_los_mutex);
los = dt_los_find(ls, FID_SEQ_LLOG);
- cfs_mutex_unlock(&ls->ls_los_mutex);
+ mutex_unlock(&ls->ls_los_mutex);
LASSERT(los);
ls_device_put(env, ls);
if (logid != NULL) {
logid_to_fid(logid, &lgi->lgi_fid);
} else if (name) {
- LASSERT(ctxt->loc_dir);
- dt_read_lock(env, ctxt->loc_dir, 0);
- rc = dt_lookup_dir(env, ctxt->loc_dir, name, &lgi->lgi_fid);
- dt_read_unlock(env, ctxt->loc_dir);
+ struct dt_object *llog_dir;
+
+ llog_dir = llog_osd_dir_get(env, ctxt);
+ if (IS_ERR(llog_dir))
+ GOTO(out, rc = PTR_ERR(llog_dir));
+ dt_read_lock(env, llog_dir, 0);
+ rc = dt_lookup_dir(env, llog_dir, name, &lgi->lgi_fid);
+ dt_read_unlock(env, llog_dir);
+ lu_object_put(env, &llog_dir->do_lu);
if (rc == -ENOENT && open_param == LLOG_OPEN_NEW) {
/* generate fid for new llog */
rc = local_object_fid_generate(env, los,
RETURN(rc);
if (res->lgh_name) {
- LASSERT(res->lgh_ctxt->loc_dir);
+ struct dt_object *llog_dir;
+
+ llog_dir = llog_osd_dir_get(env, res->lgh_ctxt);
+ if (IS_ERR(llog_dir))
+ RETURN(PTR_ERR(llog_dir));
dt_declare_ref_add(env, o, th);
logid_to_fid(&res->lgh_id, &lgi->lgi_fid);
- rc = dt_declare_insert(env, res->lgh_ctxt->loc_dir,
+ rc = dt_declare_insert(env, llog_dir,
(struct dt_rec *)&lgi->lgi_fid,
(struct dt_key *)res->lgh_name, th);
+ lu_object_put(env, &llog_dir->do_lu);
if (rc)
CERROR("%s: can't declare named llog %s: rc = %d\n",
o->do_lu.lo_dev->ld_obd->obd_name,
RETURN(rc);
if (res->lgh_name) {
- LASSERT(res->lgh_ctxt->loc_dir);
+ struct dt_object *llog_dir;
+
+ llog_dir = llog_osd_dir_get(env, res->lgh_ctxt);
+ if (IS_ERR(llog_dir))
+ RETURN(PTR_ERR(llog_dir));
+
logid_to_fid(&res->lgh_id, &lgi->lgi_fid);
- dt_read_lock(env, res->lgh_ctxt->loc_dir, 0);
- rc = dt_insert(env, res->lgh_ctxt->loc_dir,
+ dt_read_lock(env, llog_dir, 0);
+ rc = dt_insert(env, llog_dir,
(struct dt_rec *)&lgi->lgi_fid,
(struct dt_key *)res->lgh_name,
th, BYPASS_CAPA, 1);
- dt_read_unlock(env, res->lgh_ctxt->loc_dir);
+ dt_read_unlock(env, llog_dir);
+ lu_object_put(env, &llog_dir->do_lu);
if (rc)
CERROR("%s: can't create named llog %s: rc = %d\n",
o->do_lu.lo_dev->ld_obd->obd_name,
static int llog_osd_destroy(const struct lu_env *env,
struct llog_handle *loghandle)
{
+ struct llog_thread_info *lgi = llog_info(env);
struct llog_ctxt *ctxt;
- struct dt_object *o;
+ struct dt_object *o, *llog_dir = NULL;
struct dt_device *d;
struct thandle *th;
char *name = NULL;
RETURN(PTR_ERR(th));
if (loghandle->lgh_name) {
- LASSERT(ctxt->loc_dir);
+ llog_dir = llog_osd_dir_get(env, ctxt);
+ if (IS_ERR(llog_dir))
+ GOTO(out_trans, rc = PTR_ERR(llog_dir));
+
dt_declare_ref_del(env, o, th);
name = loghandle->lgh_name;
- rc = dt_declare_delete(env, ctxt->loc_dir,
+ rc = dt_declare_delete(env, llog_dir,
(struct dt_key *)name, th);
if (rc)
GOTO(out_trans, rc);
if (dt_object_exists(o)) {
if (name) {
dt_ref_del(env, o, th);
- dt_read_lock(env, ctxt->loc_dir, 0);
- rc = dt_delete(env, ctxt->loc_dir,
+ dt_read_lock(env, llog_dir, 0);
+ rc = dt_delete(env, llog_dir,
(struct dt_key *) name,
th, BYPASS_CAPA);
- dt_read_unlock(env, ctxt->loc_dir);
+ dt_read_unlock(env, llog_dir);
if (rc) {
CERROR("%s: can't remove llog %s: rc = %d\n",
o->do_lu.lo_dev->ld_obd->obd_name,
GOTO(out_unlock, rc);
}
}
- dt_ref_del(env, o, th);
+ /*
+ * XXX: compatibility bits
+ * on old filesystems llogs are referenced by the name
+ * on the new ones they are referenced by OI and by
+ * the name
+ */
+ rc = dt_attr_get(env, o, &lgi->lgi_attr, NULL);
+ if (rc)
+ GOTO(out_unlock, rc);
+ LASSERT(lgi->lgi_attr.la_nlink < 2);
+ if (lgi->lgi_attr.la_nlink == 1)
+ dt_ref_del(env, o, th);
rc = dt_destroy(env, o, th);
if (rc)
GOTO(out_unlock, rc);
dt_write_unlock(env, o);
out_trans:
dt_trans_stop(env, d, th);
+ if (llog_dir != NULL)
+ lu_object_put(env, &llog_dir->do_lu);
RETURN(rc);
}
if (IS_ERR(ls))
RETURN(PTR_ERR(ls));
- cfs_mutex_lock(&ls->ls_los_mutex);
+ mutex_lock(&ls->ls_los_mutex);
los = dt_los_find(ls, FID_SEQ_LLOG);
- cfs_mutex_unlock(&ls->ls_los_mutex);
+ mutex_unlock(&ls->ls_los_mutex);
if (los != NULL) {
dt_los_put(los);
local_oid_storage_fini(env, los);
lgi->lgi_off = idx * sizeof(*idarray);
lu_local_obj_fid(&lgi->lgi_fid, LLOG_CATALOGS_OID);
+
o = dt_locate(env, d, &lgi->lgi_fid);
if (IS_ERR(o))
RETURN(PTR_ERR(o));
lgi->lgi_off = idx * sizeof(*idarray);
lu_local_obj_fid(&lgi->lgi_fid, LLOG_CATALOGS_OID);
+
o = dt_locate(env, d, &lgi->lgi_fid);
if (IS_ERR(o))
RETURN(PTR_ERR(o));