X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fobdclass%2Fllog_osd.c;h=6c80c73b998c8f919a16f08e0f7e641e26480da6;hb=a9c51c03eaecb5234ba754b738877323a35bed51;hp=57dcd76a549dd16cadcb31e56170463a61f36bb7;hpb=48f6c6f97823f0b64acaa17524b49132ab3058db;p=fs%2Flustre-release.git diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index 57dcd76..6c80c73 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -15,25 +15,28 @@ * * You should have received a copy of the GNU General Public License * version 2 along with this program; If not, see - * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf - * - * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara, - * CA 95054 USA or visit www.sun.com if you need additional information or - * have any questions. + * http://www.gnu.org/licenses/gpl-2.0.html * * GPL HEADER END */ /* - * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved. + * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, 2013, Intel Corporation. + * Copyright (c) 2012, 2014 Intel Corporation. */ /* * This file is part of Lustre, http://www.lustre.org/ * Lustre is a trademark of Sun Microsystems, Inc. + */ +/* + * lustre/obdclass/llog_osd.c + * + * Low level llog routines on top of OSD API * - * lustre/obdclass/llog_osd.c - low level llog routines on top of OSD API + * This file provides set of methods for llog operations on top of + * dt_device. It contains all supported llog_operations interfaces and + * supplimental functions. * * Author: Alexey Zhuravlev * Author: Mikhail Pershin @@ -49,14 +52,20 @@ #include "llog_internal.h" #include "local_storage.h" -/* - * - multi-chunks or big-declaration approach - * - use unique sequence instead of llog sb tracking unique ids - * - re-use existing environment - * - named llog support (can be used for testing only at the present) - * - llog_origin_connect() work with OSD API +/** + * Implementation of the llog_operations::lop_declare_create + * + * This function is a wrapper over local_storage API function + * local_object_declare_create(). + * + * \param[in] env execution environment + * \param[in] los local_storage for bottom storage device + * \param[in] o dt_object to create + * \param[in] th current transaction handle + * + * \retval 0 on successful declaration of the new object + * \retval negative error if declaration was failed */ - static int llog_osd_declare_new_object(const struct lu_env *env, struct local_oid_storage *los, struct dt_object *o, @@ -72,6 +81,20 @@ static int llog_osd_declare_new_object(const struct lu_env *env, &lgi->lgi_dof, th); } +/** + * Implementation of the llog_operations::lop_create + * + * This function is a wrapper over local_storage API function + * local_object_create(). + * + * \param[in] env execution environment + * \param[in] los local_storage for bottom storage device + * \param[in] o dt_object to create + * \param[in] th current transaction handle + * + * \retval 0 on successful creation of the new object + * \retval negative error if creation was failed + */ static int llog_osd_create_new_object(const struct lu_env *env, struct local_oid_storage *los, struct dt_object *o, @@ -87,10 +110,34 @@ static int llog_osd_create_new_object(const struct lu_env *env, &lgi->lgi_dof, th); } +/** + * Write a padding record to the llog + * + * This function writes a padding record to the end of llog. That may + * be needed if llog contains records of variable size, e.g. config logs + * or changelogs. + * The padding record just aligns llog to the LLOG_CHUNK_SIZE boundary if + * the current record doesn't fit in the remaining space. + * + * It allocates full length to avoid two separate writes for header and tail. + * Such 2-steps scheme needs extra protection and complex error handling. + * + * \param[in] env execution environment + * \param[in] o dt_object to create + * \param[in,out] off pointer to the padding start offset + * \param[in] len padding length + * \param[in] index index of the padding record in a llog + * \param[in] th current transaction handle + * + * \retval 0 on successful padding write + * \retval negative error if write failed + */ static int llog_osd_pad(const struct lu_env *env, struct dt_object *o, loff_t *off, int len, int index, struct thandle *th) { struct llog_thread_info *lgi = llog_info(env); + struct llog_rec_hdr *rec; + struct llog_rec_tail *tail; int rc; ENTRY; @@ -99,109 +146,41 @@ static int llog_osd_pad(const struct lu_env *env, struct dt_object *o, LASSERT(off); LASSERT(len >= LLOG_MIN_REC_SIZE && (len & 0x7) == 0); - lgi->lgi_tail.lrt_len = lgi->lgi_lrh.lrh_len = len; - lgi->lgi_tail.lrt_index = lgi->lgi_lrh.lrh_index = index; - lgi->lgi_lrh.lrh_type = LLOG_PAD_MAGIC; - - lgi->lgi_buf.lb_buf = &lgi->lgi_lrh; - lgi->lgi_buf.lb_len = sizeof(lgi->lgi_lrh); - dt_write_lock(env, o, 0); - rc = dt_record_write(env, o, &lgi->lgi_buf, off, th); - if (rc) { - CERROR("%s: error writing padding record: rc = %d\n", - o->do_lu.lo_dev->ld_obd->obd_name, rc); - GOTO(out, rc); - } + OBD_ALLOC(rec, len); + if (rec == NULL) + RETURN(-ENOMEM); - lgi->lgi_buf.lb_buf = &lgi->lgi_tail; - lgi->lgi_buf.lb_len = sizeof(lgi->lgi_tail); - *off += len - sizeof(lgi->lgi_lrh) - sizeof(lgi->lgi_tail); - rc = dt_record_write(env, o, &lgi->lgi_buf, off, th); - if (rc) - CERROR("%s: error writing padding record: rc = %d\n", - o->do_lu.lo_dev->ld_obd->obd_name, rc); -out: - dt_write_unlock(env, o); - RETURN(rc); -} - -static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o, - struct llog_rec_hdr *rec, void *buf, - loff_t *off, struct thandle *th) -{ - struct llog_thread_info *lgi = llog_info(env); - int buflen = rec->lrh_len; - int rc; - - ENTRY; - - LASSERT(env); - LASSERT(o); - - if (buflen == 0) - CWARN("0-length record\n"); - - CDEBUG(D_OTHER, "write blob with type %x, buf %p/%u at off %llu\n", - rec->lrh_type, buf, buflen, *off); - - lgi->lgi_attr.la_valid = LA_SIZE; - lgi->lgi_attr.la_size = *off; + rec->lrh_len = len; + rec->lrh_index = index; + rec->lrh_type = LLOG_PAD_MAGIC; - if (!buf) { - lgi->lgi_buf.lb_len = buflen; - lgi->lgi_buf.lb_buf = rec; - rc = dt_record_write(env, o, &lgi->lgi_buf, off, th); - if (rc) - CERROR("%s: error writing log record: rc = %d\n", - o->do_lu.lo_dev->ld_obd->obd_name, rc); - GOTO(out, rc); - } + tail = rec_tail(rec); + tail->lrt_len = len; + tail->lrt_index = index; - /* the buf case */ - /* protect the following 3 writes from concurrent read */ - dt_write_lock(env, o, 0); - rec->lrh_len = sizeof(*rec) + buflen + sizeof(lgi->lgi_tail); - lgi->lgi_buf.lb_len = sizeof(*rec); lgi->lgi_buf.lb_buf = rec; - rc = dt_record_write(env, o, &lgi->lgi_buf, off, th); - if (rc) { - CERROR("%s: error writing log hdr: rc = %d\n", - o->do_lu.lo_dev->ld_obd->obd_name, rc); - GOTO(out_unlock, rc); - } - - lgi->lgi_buf.lb_len = buflen; - lgi->lgi_buf.lb_buf = buf; - rc = dt_record_write(env, o, &lgi->lgi_buf, off, th); - if (rc) { - CERROR("%s: error writing log buffer: rc = %d\n", - o->do_lu.lo_dev->ld_obd->obd_name, rc); - GOTO(out_unlock, rc); - } - - lgi->lgi_tail.lrt_len = rec->lrh_len; - lgi->lgi_tail.lrt_index = rec->lrh_index; - lgi->lgi_buf.lb_len = sizeof(lgi->lgi_tail); - lgi->lgi_buf.lb_buf = &lgi->lgi_tail; + lgi->lgi_buf.lb_len = len; rc = dt_record_write(env, o, &lgi->lgi_buf, off, th); if (rc) - CERROR("%s: error writing log tail: rc = %d\n", + CERROR("%s: error writing padding record: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, rc); -out_unlock: - dt_write_unlock(env, o); - -out: - /* cleanup the content written above */ - if (rc) { - dt_punch(env, o, lgi->lgi_attr.la_size, OBD_OBJECT_EOF, th, - BYPASS_CAPA); - dt_attr_set(env, o, &lgi->lgi_attr, th, BYPASS_CAPA); - } - + OBD_FREE(rec, len); RETURN(rc); } +/** + * Implementation of the llog_operations::lop_read_header + * + * This function reads the current llog header from the bottom storage + * device. + * + * \param[in] env execution environment + * \param[in] handle llog handle of the current llog + * + * \retval 0 on successful header read + * \retval negative error if read failed + */ static int llog_osd_read_header(const struct lu_env *env, struct llog_handle *handle) { @@ -269,6 +248,25 @@ static int llog_osd_read_header(const struct lu_env *env, RETURN(0); } +/** + * Implementation of the llog_operations::lop_declare_write + * + * This function declares the new record write. + * + * \param[in] env execution environment + * \param[in] loghandle llog handle of the current llog + * \param[in] rec llog record header. This is a real header of the full + * llog record to write. This is the beginning of buffer + * to write, the length of buffer is stored in + * \a rec::lrh_len + * \param[in] idx index of the llog record. If \a idx == -1 then this is + * append case, otherwise \a idx is the index of record + * to modify + * \param[in] th current transaction handle + * + * \retval 0 on successful declaration + * \retval negative error if declaration failed + */ static int llog_osd_declare_write_rec(const struct lu_env *env, struct llog_handle *loghandle, struct llog_rec_hdr *rec, @@ -283,51 +281,69 @@ static int llog_osd_declare_write_rec(const struct lu_env *env, LASSERT(env); LASSERT(th); LASSERT(loghandle); + LASSERT(rec); + LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE); o = loghandle->lgh_obj; LASSERT(o); + lgi->lgi_buf.lb_len = sizeof(struct llog_log_hdr); + lgi->lgi_buf.lb_buf = NULL; /* each time we update header */ - rc = dt_declare_record_write(env, o, sizeof(struct llog_log_hdr), 0, + rc = dt_declare_record_write(env, o, &lgi->lgi_buf, 0, th); if (rc || idx == 0) /* if error or just header */ RETURN(rc); - if (dt_object_exists(o)) { - rc = dt_attr_get(env, o, &lgi->lgi_attr, BYPASS_CAPA); - lgi->lgi_off = lgi->lgi_attr.la_size; - LASSERT(ergo(rc == 0, lgi->lgi_attr.la_valid & LA_SIZE)); - if (rc) - RETURN(rc); - - rc = dt_declare_punch(env, o, lgi->lgi_off, OBD_OBJECT_EOF, th); - if (rc) - RETURN(rc); - } else { - lgi->lgi_off = 0; - } - + /** + * the pad record can be inserted so take into account double + * record size + */ + lgi->lgi_buf.lb_len = rec->lrh_len * 2; + lgi->lgi_buf.lb_buf = NULL; /* XXX: implement declared window or multi-chunks approach */ - rc = dt_declare_record_write(env, o, 32 * 1024, lgi->lgi_off, th); + rc = dt_declare_record_write(env, o, &lgi->lgi_buf, -1, th); RETURN(rc); } -/* returns negative in on error; 0 if success && reccookie == 0; 1 otherwise */ -/* appends if idx == -1, otherwise overwrites record idx. */ +/** + * Implementation of the llog_operations::lop_write + * + * This function writes the new record in the llog or modify the existed one. + * + * \param[in] env execution environment + * \param[in] loghandle llog handle of the current llog + * \param[in] rec llog record header. This is a real header of + * the full llog record to write. This is + * the beginning of buffer to write, the length + * of buffer is stored in \a rec::lrh_len + * \param[out] reccookie pointer to the cookie to return back if needed. + * It is used for further cancel of this llog + * record. + * \param[in] idx index of the llog record. If \a idx == -1 then + * this is append case, otherwise \a idx is + * the index of record to modify + * \param[in] th current transaction handle + * + * \retval 0 on successful write && \a reccookie == NULL + * 1 on successful write && \a reccookie != NULL + * \retval negative error if write failed + */ static int llog_osd_write_rec(const struct lu_env *env, struct llog_handle *loghandle, struct llog_rec_hdr *rec, - struct llog_cookie *reccookie, int cookiecount, - void *buf, int idx, struct thandle *th) + struct llog_cookie *reccookie, + int idx, struct thandle *th) { struct llog_thread_info *lgi = llog_info(env); struct llog_log_hdr *llh; int reclen = rec->lrh_len; - int index, rc, old_tail_idx; + int index, rc; struct llog_rec_tail *lrt; struct dt_object *o; size_t left; + bool header_is_updated = false; ENTRY; @@ -342,68 +358,100 @@ static int llog_osd_write_rec(const struct lu_env *env, rec->lrh_type, PFID(lu_object_fid(&o->do_lu))); /* record length should not bigger than LLOG_CHUNK_SIZE */ - if (buf) - rc = (reclen > LLOG_CHUNK_SIZE - sizeof(struct llog_rec_hdr) - - sizeof(struct llog_rec_tail)) ? -E2BIG : 0; - else - rc = (reclen > LLOG_CHUNK_SIZE) ? -E2BIG : 0; - if (rc) - RETURN(rc); + if (reclen > LLOG_CHUNK_SIZE) + RETURN(-E2BIG); rc = dt_attr_get(env, o, &lgi->lgi_attr, NULL); if (rc) RETURN(rc); - if (buf) - /* write_blob adds header and tail to lrh_len. */ - reclen = sizeof(*rec) + rec->lrh_len + - sizeof(struct llog_rec_tail); - - if (idx != -1) { - /* no header: only allowed to insert record 1 */ - if (idx != 1 && lgi->lgi_attr.la_size == 0) - LBUG(); - - if (idx && llh->llh_size && llh->llh_size != rec->lrh_len) - RETURN(-EINVAL); + /** + * The modification case. + * If idx set then the record with that index must be modified. + * There are three cases possible: + * 1) the common case is the llog header update (idx == 0) + * 2) the llog record modification during llog process. + * This is indicated by the \a loghandle::lgh_cur_idx > 0. + * In that case the \a loghandle::lgh_cur_offset + * 3) otherwise this is assumed that llog consist of records of + * fixed size, i.e. catalog. The llog header must has llh_size + * field equal to record size. The record offset is calculated + * just by /a idx value + * + * During modification we don't need extra header update because + * the bitmap and record count are not changed. The record header + * and tail remains the same too. + */ + if (idx != LLOG_NEXT_IDX) { + /* llog can be empty only when first record is being written */ + LASSERT(ergo(idx > 0, lgi->lgi_attr.la_size > 0)); - if (!ext2_test_bit(idx, llh->llh_bitmap)) + if (!ext2_test_bit(idx, llh->llh_bitmap)) { CERROR("%s: modify unset record %u\n", o->do_lu.lo_dev->ld_obd->obd_name, idx); - if (idx != rec->lrh_index) - CERROR("%s: index mismatch %d %u\n", + RETURN(-ENOENT); + } + + if (idx != rec->lrh_index) { + CERROR("%s: modify index mismatch %d %u\n", o->do_lu.lo_dev->ld_obd->obd_name, idx, rec->lrh_index); + RETURN(-EFAULT); + } - lgi->lgi_off = 0; - rc = llog_osd_write_blob(env, o, &llh->llh_hdr, NULL, - &lgi->lgi_off, th); - /* we are done if we only write the header or on error */ - if (rc || idx == 0) - RETURN(rc); + if (idx == LLOG_HEADER_IDX) { + /* llog header update */ + LASSERT(reclen == sizeof(struct llog_log_hdr)); + LASSERT(rec == &llh->llh_hdr); - if (buf) { - /* We assume that caller has set lgh_cur_* */ - lgi->lgi_off = loghandle->lgh_cur_offset; - CDEBUG(D_OTHER, - "modify record "DOSTID": idx:%d/%u/%d, len:%u " - "offset %llu\n", - POSTID(&loghandle->lgh_id.lgl_oi), idx, - rec->lrh_index, - loghandle->lgh_cur_idx, rec->lrh_len, - (long long)(lgi->lgi_off - sizeof(*llh))); - if (rec->lrh_index != loghandle->lgh_cur_idx) { - CERROR("%s: modify idx mismatch %u/%d\n", + lgi->lgi_off = 0; + lgi->lgi_buf.lb_len = reclen; + lgi->lgi_buf.lb_buf = rec; + rc = dt_record_write(env, o, &lgi->lgi_buf, + &lgi->lgi_off, th); + RETURN(rc); + } else if (loghandle->lgh_cur_idx > 0) { + /** + * The lgh_cur_offset can be used only if index is + * the same. + */ + if (idx != loghandle->lgh_cur_idx) { + CERROR("%s: modify index mismatch %d %d\n", o->do_lu.lo_dev->ld_obd->obd_name, idx, loghandle->lgh_cur_idx); RETURN(-EFAULT); } - } else { - /* Assumes constant lrh_len */ + + lgi->lgi_off = loghandle->lgh_cur_offset; + CDEBUG(D_OTHER, "modify record "DOSTID": idx:%d, " + "len:%u offset %llu\n", + POSTID(&loghandle->lgh_id.lgl_oi), idx, + rec->lrh_len, (long long)lgi->lgi_off); + } else if (llh->llh_size > 0) { + if (llh->llh_size != rec->lrh_len) { + CERROR("%s: wrong record size, llh_size is %u" + " but record size is %u\n", + o->do_lu.lo_dev->ld_obd->obd_name, + llh->llh_size, rec->lrh_len); + RETURN(-EINVAL); + } lgi->lgi_off = sizeof(*llh) + (idx - 1) * reclen; + } else { + /* This can be result of lgh_cur_idx is not set during + * llog processing or llh_size is not set to proper + * record size for fixed records llog. Therefore it is + * impossible to get record offset. */ + CERROR("%s: can't get record offset, idx:%d, " + "len:%u.\n", o->do_lu.lo_dev->ld_obd->obd_name, + idx, rec->lrh_len); + RETURN(-EFAULT); } - rc = llog_osd_write_blob(env, o, rec, buf, &lgi->lgi_off, th); + /* update only data, header and tail remain the same */ + lgi->lgi_off += sizeof(struct llog_rec_hdr); + lgi->lgi_buf.lb_len = REC_DATA_LEN(rec); + lgi->lgi_buf.lb_buf = REC_DATA(rec); + rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc == 0 && reccookie) { reccookie->lgc_lgl = loghandle->lgh_id; reccookie->lgc_index = idx; @@ -412,12 +460,15 @@ static int llog_osd_write_rec(const struct lu_env *env, RETURN(rc); } - /* Make sure that records don't cross a chunk boundary, so we can + /** + * The append case. + * The most common case of using llog. The new index is assigned to + * the new record, new bit is set in llog bitmap and llog count is + * incremented. + * + * Make sure that records don't cross a chunk boundary, so we can * process them page-at-a-time if needed. If it will cross a chunk * boundary, write in a fake (but referenced) entry to pad the chunk. - * - * We know that llog_current_log() will return a loghandle that is - * big enough to hold reclen, so all we care about is padding here. */ LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); lgi->lgi_off = lgi->lgi_attr.la_size; @@ -429,25 +480,30 @@ static int llog_osd_write_rec(const struct lu_env *env, rc = llog_osd_pad(env, o, &lgi->lgi_off, left, index, th); if (rc) RETURN(rc); - loghandle->lgh_last_idx++; /*for pad rec*/ + loghandle->lgh_last_idx++; /* for pad rec */ } /* if it's the last idx in log file, then return -ENOSPC */ if (loghandle->lgh_last_idx >= LLOG_BITMAP_SIZE(llh) - 1) RETURN(-ENOSPC); + /* increment the last_idx along with llh_tail index, they should + * be equal for a llog lifetime */ loghandle->lgh_last_idx++; index = loghandle->lgh_last_idx; + llh->llh_tail.lrt_index = index; + /** + * NB: the caller should make sure only 1 process access + * the lgh_last_idx, e.g. append should be exclusive. + * Otherwise it might hit the assert. + */ LASSERT(index < LLOG_BITMAP_SIZE(llh)); rec->lrh_index = index; - if (buf == NULL) { - lrt = (struct llog_rec_tail *)((char *)rec + rec->lrh_len - - sizeof(*lrt)); - lrt->lrt_len = rec->lrh_len; - lrt->lrt_index = rec->lrh_index; - } - /* The caller should make sure only 1 process access the lgh_last_idx, - * Otherwise it might hit the assert.*/ - LASSERT(index < LLOG_BITMAP_SIZE(llh)); + lrt = rec_tail(rec); + lrt->lrt_len = rec->lrh_len; + lrt->lrt_index = rec->lrh_index; + + /* the lgh_hdr_lock protects llog header data from concurrent + * update/cancel, the llh_count and llh_bitmap are protected */ spin_lock(&loghandle->lgh_hdr_lock); if (ext2_set_bit(index, llh->llh_bitmap)) { CERROR("%s: index %u already set in log bitmap\n", @@ -457,43 +513,30 @@ static int llog_osd_write_rec(const struct lu_env *env, } llh->llh_count++; spin_unlock(&loghandle->lgh_hdr_lock); - old_tail_idx = llh->llh_tail.lrt_index; - llh->llh_tail.lrt_index = index; lgi->lgi_off = 0; - rc = llog_osd_write_blob(env, o, &llh->llh_hdr, NULL, &lgi->lgi_off, - th); + lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len; + lgi->lgi_buf.lb_buf = &llh->llh_hdr; + rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc) GOTO(out, rc); + header_is_updated = true; rc = dt_attr_get(env, o, &lgi->lgi_attr, NULL); if (rc) GOTO(out, rc); LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); lgi->lgi_off = lgi->lgi_attr.la_size; + lgi->lgi_buf.lb_len = reclen; + lgi->lgi_buf.lb_buf = rec; + rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); + if (rc < 0) + GOTO(out, rc); - rc = llog_osd_write_blob(env, o, rec, buf, &lgi->lgi_off, th); - -out: - /* cleanup llog for error case */ - if (rc) { - spin_lock(&loghandle->lgh_hdr_lock); - ext2_clear_bit(index, llh->llh_bitmap); - llh->llh_count--; - spin_unlock(&loghandle->lgh_hdr_lock); - - /* restore the header */ - loghandle->lgh_last_idx--; - llh->llh_tail.lrt_index = old_tail_idx; - lgi->lgi_off = 0; - llog_osd_write_blob(env, o, &llh->llh_hdr, NULL, - &lgi->lgi_off, th); - } - - CDEBUG(D_RPCTRACE, "added record "DOSTID": idx: %u, %u\n", + CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u\n", POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len); - if (rc == 0 && reccookie) { + if (reccookie != NULL) { reccookie->lgc_lgl = loghandle->lgh_id; reccookie->lgc_index = index; if ((rec->lrh_type == MDS_UNLINK_REC) || @@ -506,15 +549,36 @@ out: rc = 1; } RETURN(rc); +out: + /* cleanup llog for error case */ + spin_lock(&loghandle->lgh_hdr_lock); + ext2_clear_bit(index, llh->llh_bitmap); + llh->llh_count--; + spin_unlock(&loghandle->lgh_hdr_lock); + + /* restore llog last_idx */ + loghandle->lgh_last_idx--; + llh->llh_tail.lrt_index = loghandle->lgh_last_idx; + + /* restore the header on disk if it was written */ + if (header_is_updated) { + lgi->lgi_off = 0; + lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len; + lgi->lgi_buf.lb_buf = &llh->llh_hdr; + dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); + } + + RETURN(rc); } -/* We can skip reading at least as many log blocks as the number of +/** + * We can skip reading at least as many log blocks as the number of * minimum sized log records we are skipping. If it turns out * that we are not far enough along the log (because the * actual records are larger than minimum size) we just skip * some more records. */ -static void llog_skip_over(__u64 *off, int curr, int goal) +static inline void llog_skip_over(__u64 *off, int curr, int goal) { if (goal <= curr) return; @@ -522,10 +586,23 @@ static void llog_skip_over(__u64 *off, int curr, int goal) ~(LLOG_CHUNK_SIZE - 1); } -/* sets: - * - cur_offset to the furthest point read in the log file - * - cur_idx to the log index preceeding cur_offset - * returns -EIO/-EINVAL on error +/** + * Implementation of the llog_operations::lop_next_block + * + * This function finds the the next llog block to return which contains + * record with required index. It is main part of llog processing. + * + * \param[in] env execution environment + * \param[in] loghandle llog handle of the current llog + * \param[in,out] cur_idx index preceeding cur_offset + * \param[in] next_idx target index to find + * \param[in,out] cur_offset furtherst point read in the file + * \param[in] buf pointer to data buffer to fill + * \param[in] len required len to read, it is + * LLOG_CHUNK_SIZE usually. + * + * \retval 0 on successful buffer read + * \retval negative value on error */ static int llog_osd_next_block(const struct lu_env *env, struct llog_handle *loghandle, int *cur_idx, @@ -572,18 +649,7 @@ static int llog_osd_next_block(const struct lu_env *env, (*cur_offset & (LLOG_CHUNK_SIZE - 1)); lgi->lgi_buf.lb_buf = buf; - /* Note: read lock is not needed around la_size get above at - * the time of dt_attr_get(). There are only two cases that - * matter. Either la_size == cur_offset, in which case the - * entire read is skipped, or la_size > cur_offset and the loop - * is entered and this thread is blocked at dt_read_lock() - * until the write is completed. When the write completes, then - * the dt_read() will be done with the full length, and will - * get the full data. - */ - dt_read_lock(env, o, 0); rc = dt_read(env, o, &lgi->lgi_buf, cur_offset); - dt_read_unlock(env, o); if (rc < 0) { CERROR("%s: can't read llog block from log "DFID " offset "LPU64": rc = %d\n", @@ -654,6 +720,23 @@ out: return rc; } +/** + * Implementation of the llog_operations::lop_prev_block + * + * This function finds the llog block to return which contains + * record with required index but in reverse order - from end of llog + * to the beginning. + * It is main part of reverse llog processing. + * + * \param[in] env execution environment + * \param[in] loghandle llog handle of the current llog + * \param[in] prev_idx target index to find + * \param[in] buf pointer to data buffer to fill + * \param[in] len required len to read, it is LLOG_CHUNK_SIZE usually. + * + * \retval 0 on successful buffer read + * \retval negative value on error + */ static int llog_osd_prev_block(const struct lu_env *env, struct llog_handle *loghandle, int prev_idx, void *buf, int len) @@ -693,12 +776,7 @@ static int llog_osd_prev_block(const struct lu_env *env, lgi->lgi_buf.lb_len = len; lgi->lgi_buf.lb_buf = buf; - /* It is OK to have locking around dt_read() only, see - * comment in llog_osd_next_block for details - */ - dt_read_lock(env, o, 0); rc = dt_read(env, o, &lgi->lgi_buf, &cur_offset); - dt_read_unlock(env, o); if (rc < 0) { CERROR("%s: can't read llog block from log "DFID " offset "LPU64": rc = %d\n", @@ -760,6 +838,16 @@ out: return rc; } +/** + * This is helper function to get llog directory object. It is used by named + * llog operations to find/insert/delete llog entry from llog directory. + * + * \param[in] env execution environment + * \param[in] ctxt llog context + * + * \retval dt_object of llog directory + * \retval ERR_PTR of negative value on error + */ struct dt_object *llog_osd_dir_get(const struct lu_env *env, struct llog_ctxt *ctxt) { @@ -787,6 +875,27 @@ struct dt_object *llog_osd_dir_get(const struct lu_env *env, return dir; } +/** + * Implementation of the llog_operations::lop_open + * + * This function opens the llog by its logid or by name, it may open also + * non existent llog and assing then new id to it. + * The llog_open/llog_close pair works similar to lu_object_find/put, + * the object may not exist prior open. The result of open is just dt_object + * in the llog header. + * + * \param[in] env execution environment + * \param[in] handle llog handle of the current llog + * \param[in] logid logid of llog to open (nameless llog) + * \param[in] name name of llog to open (named llog) + * \param[in] open_param + * LLOG_OPEN_NEW - new llog, may not exist + * LLOG_OPEN_EXIST - old llog, must exist + * + * \retval 0 on successful open, llog_handle::lgh_obj + * contains the dt_object of the llog. + * \retval negative value on error + */ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle, struct llog_logid *logid, char *name, enum llog_open_param open_param) @@ -852,7 +961,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle, GOTO(out, rc); } - o = ls_locate(env, ls, &lgi->lgi_fid); + o = ls_locate(env, ls, &lgi->lgi_fid, NULL); if (IS_ERR(o)) GOTO(out_name, rc = PTR_ERR(o)); @@ -877,6 +986,16 @@ out: RETURN(rc); } +/** + * Implementation of the llog_operations::lop_exist + * + * This function checks that llog exists on storage. + * + * \param[in] handle llog handle of the current llog + * + * \retval true if llog object exists and is not just destroyed + * \retval false if llog doesn't exist or just destroyed + */ static int llog_osd_exist(struct llog_handle *handle) { LASSERT(handle->lgh_obj); @@ -884,6 +1003,19 @@ static int llog_osd_exist(struct llog_handle *handle) !lu_object_is_dying(handle->lgh_obj->do_lu.lo_header)); } +/** + * Implementation of the llog_operations::lop_declare_create + * + * This function declares the llog create. It declares also name insert + * into llog directory in case of named llog. + * + * \param[in] env execution environment + * \param[in] res llog handle of the current llog + * \param[in] th current transaction handle + * + * \retval 0 on successful create declaration + * \retval negative value on error + */ static int llog_osd_declare_create(const struct lu_env *env, struct llog_handle *res, struct thandle *th) { @@ -909,9 +1041,8 @@ static int llog_osd_declare_create(const struct lu_env *env, if (rc) RETURN(rc); - rc = dt_declare_record_write(env, o, LLOG_CHUNK_SIZE, 0, th); - if (rc) - RETURN(rc); + /* do not declare header initialization here as it's declared + * in llog_osd_declare_write_rec() which is always called */ if (res->lgh_name) { struct dt_object *llog_dir; @@ -932,8 +1063,19 @@ static int llog_osd_declare_create(const struct lu_env *env, RETURN(rc); } -/* This is a callback from the llog_* functions. - * Assumes caller has already pushed us into the kernel context. */ +/** + * Implementation of the llog_operations::lop_create + * + * This function creates the llog according with llog_handle::lgh_obj + * and llog_handle::lgh_name. + * + * \param[in] env execution environment + * \param[in] res llog handle of the current llog + * \param[in] th current transaction handle + * + * \retval 0 on successful create + * \retval negative value on error + */ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res, struct thandle *th) { @@ -988,6 +1130,18 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res, RETURN(rc); } +/** + * Implementation of the llog_operations::lop_close + * + * This function closes the llog. It just put llog object and referenced + * local storage. + * + * \param[in] env execution environment + * \param[in] handle llog handle of the current llog + * + * \retval 0 on successful llog close + * \retval negative value on error + */ static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle) { struct local_oid_storage *los; @@ -1009,6 +1163,20 @@ static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle) RETURN(rc); } +/** + * Implementation of the llog_operations::lop_destroy + * + * This function destroys the llog and deletes also entry in the + * llog directory in case of named llog. Llog should be opened prior that. + * Destroy method is not part of external transaction and does everything + * inside. + * + * \param[in] env execution environment + * \param[in] loghandle llog handle of the current llog + * + * \retval 0 on successful destroy + * \retval negative value on error + */ static int llog_osd_destroy(const struct lu_env *env, struct llog_handle *loghandle) { @@ -1086,15 +1254,28 @@ out_trans: RETURN(rc); } +/** + * Implementation of the llog_operations::lop_setup + * + * This function setup the llog on local storage. + * + * \param[in] env execution environment + * \param[in] obd obd device the llog belongs to + * \param[in] olg the llog group, it is always zero group now. + * \param[in] ctxt_idx the llog index, it defines the purpose of this llog. + * Every new llog type have to use own index. + * \param[in] disk_obd the storage obd, where llog is stored. + * + * \retval 0 on successful llog setup + * \retval negative value on error + */ static int llog_osd_setup(const struct lu_env *env, struct obd_device *obd, struct obd_llog_group *olg, int ctxt_idx, struct obd_device *disk_obd) { - struct local_oid_storage *los; struct llog_thread_info *lgi = llog_info(env); struct llog_ctxt *ctxt; int rc = 0; - ENTRY; LASSERT(obd); @@ -1109,44 +1290,51 @@ static int llog_osd_setup(const struct lu_env *env, struct obd_device *obd, lgi->lgi_fid.f_oid = 1; lgi->lgi_fid.f_ver = 0; rc = local_oid_storage_init(env, disk_obd->obd_lvfs_ctxt.dt, - &lgi->lgi_fid, &los); - if (rc < 0) - return rc; + &lgi->lgi_fid, + &ctxt->loc_los_nameless); + if (rc != 0) + GOTO(out, rc); lgi->lgi_fid.f_seq = FID_SEQ_LLOG_NAME; lgi->lgi_fid.f_oid = 1; lgi->lgi_fid.f_ver = 0; rc = local_oid_storage_init(env, disk_obd->obd_lvfs_ctxt.dt, - &lgi->lgi_fid, &los); + &lgi->lgi_fid, + &ctxt->loc_los_named); + if (rc != 0) { + local_oid_storage_fini(env, ctxt->loc_los_nameless); + ctxt->loc_los_nameless = NULL; + } + + GOTO(out, rc); + +out: llog_ctxt_put(ctxt); return rc; } +/** + * Implementation of the llog_operations::lop_cleanup + * + * This function cleanups the llog on local storage. + * + * \param[in] env execution environment + * \param[in] ctxt the llog context + * + * \retval 0 + */ static int llog_osd_cleanup(const struct lu_env *env, struct llog_ctxt *ctxt) { - struct dt_device *dt; - struct ls_device *ls; - struct local_oid_storage *los, *nlos; - - LASSERT(ctxt->loc_exp->exp_obd); - dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt; - ls = ls_device_get(dt); - if (IS_ERR(ls)) - RETURN(PTR_ERR(ls)); - - mutex_lock(&ls->ls_los_mutex); - los = dt_los_find(ls, FID_SEQ_LLOG); - nlos = dt_los_find(ls, FID_SEQ_LLOG_NAME); - mutex_unlock(&ls->ls_los_mutex); - if (los != NULL) { - dt_los_put(los); - local_oid_storage_fini(env, los); + if (ctxt->loc_los_nameless != NULL) { + local_oid_storage_fini(env, ctxt->loc_los_nameless); + ctxt->loc_los_nameless = NULL; } - if (nlos != NULL) { - dt_los_put(nlos); - local_oid_storage_fini(env, nlos); + + if (ctxt->loc_los_named != NULL) { + local_oid_storage_fini(env, ctxt->loc_los_named); + ctxt->loc_los_named = NULL; } - ls_device_put(env, ls); + return 0; } @@ -1167,9 +1355,30 @@ struct llog_operations llog_osd_ops = { }; EXPORT_SYMBOL(llog_osd_ops); -/* reads the catalog list */ +/** + * Read the special file which contains the list of llog catalogs IDs + * + * This function reads the CATALOGS file which contains the array of llog + * catalogs IDs. The main purpose of this file is to store OSP llogs indexed + * by OST/MDT number. + * + * \param[in] env execution environment + * \param[in] d corresponding storage device + * \param[in] idx position to start from, usually OST/MDT index + * \param[in] count how many catalog IDs to read + * \param[out] idarray the buffer for the data. If it is NULL then + * function returns just number of catalog IDs + * in the file. + * \param[in] fid LLOG_CATALOGS_OID for CATALOG object + * + * \retval 0 on successful read of catalog IDs + * \retval negative value on error + * \retval positive value which is number of records in + * the file if \a idarray is NULL + */ int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d, - int idx, int count, struct llog_catid *idarray) + int idx, int count, struct llog_catid *idarray, + const struct lu_fid *fid) { struct llog_thread_info *lgi = llog_info(env); struct dt_object *o = NULL; @@ -1183,8 +1392,7 @@ int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d, size = sizeof(*idarray) * count; lgi->lgi_off = idx * sizeof(*idarray); - lu_local_obj_fid(&lgi->lgi_fid, LLOG_CATALOGS_OID); - + lgi->lgi_fid = *fid; o = dt_locate(env, d, &lgi->lgi_fid); if (IS_ERR(o)) RETURN(PTR_ERR(o)); @@ -1240,7 +1448,7 @@ out_trans: /* read for new ost index or for empty file */ memset(idarray, 0, size); - if (lgi->lgi_attr.la_size < lgi->lgi_off + size) + if (lgi->lgi_attr.la_size <= lgi->lgi_off) GOTO(out, rc = 0); if (lgi->lgi_attr.la_size < lgi->lgi_off + size) size = lgi->lgi_attr.la_size - lgi->lgi_off; @@ -1261,24 +1469,40 @@ out: } EXPORT_SYMBOL(llog_osd_get_cat_list); -/* writes the cat list */ +/** + * Write the special file which contains the list of llog catalogs IDs + * + * This function writes the CATALOG file which contains the array of llog + * catalogs IDs. It is used mostly to store OSP llogs indexed by OST/MDT + * number. + * + * \param[in] env execution environment + * \param[in] d corresponding storage device + * \param[in] idx position to start from, usually OST/MDT index + * \param[in] count how many catalog IDs to write + * \param[out] idarray the buffer with the data to write. + * \param[in] fid LLOG_CATALOGS_OID for CATALOG object + * + * \retval 0 on successful write of catalog IDs + * \retval negative value on error + */ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d, - int idx, int count, struct llog_catid *idarray) + int idx, int count, struct llog_catid *idarray, + const struct lu_fid *fid) { struct llog_thread_info *lgi = llog_info(env); struct dt_object *o = NULL; struct thandle *th; int rc, size; - if (!count) + if (count == 0) RETURN(0); LASSERT(d); size = sizeof(*idarray) * count; lgi->lgi_off = idx * sizeof(*idarray); - - lu_local_obj_fid(&lgi->lgi_fid, LLOG_CATALOGS_OID); + lgi->lgi_fid = *fid; o = dt_locate(env, d, &lgi->lgi_fid); if (IS_ERR(o)) @@ -1302,7 +1526,9 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d, if (IS_ERR(th)) GOTO(out, rc = PTR_ERR(th)); - rc = dt_declare_record_write(env, o, size, lgi->lgi_off, th); + lgi->lgi_buf.lb_len = size; + lgi->lgi_buf.lb_buf = idarray; + rc = dt_declare_record_write(env, o, &lgi->lgi_buf, lgi->lgi_off, th); if (rc) GOTO(out, rc); @@ -1310,11 +1536,10 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d, if (rc) GOTO(out_trans, rc); - lgi->lgi_buf.lb_buf = idarray; - lgi->lgi_buf.lb_len = size; rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc) - CDEBUG(D_INODE, "error writeing CATALOGS: rc = %d\n", rc); + CDEBUG(D_INODE, "can't write CATALOGS at index %d: rc = %d\n", + idx, rc); out_trans: dt_trans_stop(env, d, th); out: