X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fobdclass%2Fllog_osd.c;h=a79b1ec05e9dc93a2592121deabd71fa91dabbd0;hb=df45994ddcf5e6fdc379b3e1d43f1d26ba321a0e;hp=8295d390bcbf8400c0daa2ab57d2718ac6b89c34;hpb=b69b7de30c3977cb69a741099218bc4a81752717;p=fs%2Flustre-release.git diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index 8295d39..a79b1ec 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -116,7 +116,7 @@ static int llog_osd_create_new_object(const struct lu_env *env, * This function writes a padding record to the end of llog. That may * be needed if llog contains records of variable size, e.g. config logs * or changelogs. - * The padding record just aligns llog to the LLOG_CHUNK_SIZE boundary if + * The padding record just aligns llog to the llog chunk_size boundary if * the current record doesn't fit in the remaining space. * * It allocates full length to avoid two separate writes for header and tail. @@ -187,18 +187,17 @@ static int llog_osd_read_header(const struct lu_env *env, struct llog_rec_hdr *llh_hdr; struct dt_object *o; struct llog_thread_info *lgi; + enum llog_flag flags; int rc; ENTRY; - LASSERT(sizeof(*handle->lgh_hdr) == LLOG_CHUNK_SIZE); - o = handle->lgh_obj; LASSERT(o); lgi = llog_info(env); - rc = dt_attr_get(env, o, &lgi->lgi_attr, NULL); + rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) RETURN(rc); @@ -209,19 +208,25 @@ static int llog_osd_read_header(const struct lu_env *env, RETURN(LLOG_EEMPTY); } + flags = handle->lgh_hdr->llh_flags; + lgi->lgi_off = 0; lgi->lgi_buf.lb_buf = handle->lgh_hdr; - lgi->lgi_buf.lb_len = LLOG_CHUNK_SIZE; - - rc = dt_record_read(env, o, &lgi->lgi_buf, &lgi->lgi_off); - if (rc) { - CERROR("%s: error reading log header from "DFID": rc = %d\n", + lgi->lgi_buf.lb_len = handle->lgh_hdr_size; + rc = dt_read(env, o, &lgi->lgi_buf, &lgi->lgi_off); + llh_hdr = &handle->lgh_hdr->llh_hdr; + if (rc < sizeof(*llh_hdr) || rc < llh_hdr->lrh_len) { + CERROR("%s: error reading "DFID" log header size %d: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, - PFID(lu_object_fid(&o->do_lu)), rc); + PFID(lu_object_fid(&o->do_lu)), rc < 0 ? 0 : rc, + -EFAULT); + + if (rc >= 0) + rc = -EFAULT; + RETURN(rc); } - llh_hdr = &handle->lgh_hdr->llh_hdr; if (LLOG_REC_HDR_NEEDS_SWABBING(llh_hdr)) lustre_swab_llog_hdr(handle->lgh_hdr); @@ -232,18 +237,31 @@ static int llog_osd_read_header(const struct lu_env *env, PFID(lu_object_fid(&o->do_lu)), llh_hdr->lrh_type, LLOG_HDR_MAGIC); RETURN(-EIO); - } else if (llh_hdr->lrh_len != LLOG_CHUNK_SIZE) { + } else if (llh_hdr->lrh_len < LLOG_MIN_CHUNK_SIZE || + llh_hdr->lrh_len > handle->lgh_hdr_size) { CERROR("%s: incorrectly sized log %s "DFID" header: " - "%#x (expected %#x)\n" + "%#x (expected at least %#x)\n" "you may need to re-run lconf --write_conf.\n", o->do_lu.lo_dev->ld_obd->obd_name, handle->lgh_name ? handle->lgh_name : "", PFID(lu_object_fid(&o->do_lu)), - llh_hdr->lrh_len, LLOG_CHUNK_SIZE); + llh_hdr->lrh_len, LLOG_MIN_CHUNK_SIZE); + RETURN(-EIO); + } else if (LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index > + LLOG_HDR_BITMAP_SIZE(handle->lgh_hdr) || + LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len != + llh_hdr->lrh_len) { + CERROR("%s: incorrectly sized log %s "DFID" tailer: " + "%#x : rc = %d\n", + o->do_lu.lo_dev->ld_obd->obd_name, + handle->lgh_name ? handle->lgh_name : "", + PFID(lu_object_fid(&o->do_lu)), + LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len, -EIO); RETURN(-EIO); } - handle->lgh_last_idx = handle->lgh_hdr->llh_tail.lrt_index; + handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK); + handle->lgh_last_idx = LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index; RETURN(0); } @@ -273,6 +291,7 @@ static int llog_osd_declare_write_rec(const struct lu_env *env, int idx, struct thandle *th) { struct llog_thread_info *lgi = llog_info(env); + __u32 chunk_size; struct dt_object *o; int rc; @@ -282,12 +301,13 @@ static int llog_osd_declare_write_rec(const struct lu_env *env, LASSERT(th); LASSERT(loghandle); LASSERT(rec); - LASSERT(rec->lrh_len <= LLOG_CHUNK_SIZE); + LASSERT(rec->lrh_len <= loghandle->lgh_ctxt->loc_chunk_size); o = loghandle->lgh_obj; LASSERT(o); - lgi->lgi_buf.lb_len = sizeof(struct llog_log_hdr); + chunk_size = loghandle->lgh_ctxt->loc_chunk_size; + lgi->lgi_buf.lb_len = chunk_size; lgi->lgi_buf.lb_buf = NULL; /* each time we update header */ rc = dt_declare_record_write(env, o, &lgi->lgi_buf, 0, @@ -299,7 +319,7 @@ static int llog_osd_declare_write_rec(const struct lu_env *env, * the pad record can be inserted so take into account double * record size */ - lgi->lgi_buf.lb_len = rec->lrh_len * 2; + lgi->lgi_buf.lb_len = chunk_size * 2; lgi->lgi_buf.lb_buf = NULL; /* XXX: implement declared window or multi-chunks approach */ rc = dt_declare_record_write(env, o, &lgi->lgi_buf, -1, th); @@ -342,8 +362,8 @@ static int llog_osd_write_rec(const struct lu_env *env, int index, rc; struct llog_rec_tail *lrt; struct dt_object *o; + __u32 chunk_size; size_t left; - bool header_is_updated = false; ENTRY; @@ -354,14 +374,15 @@ static int llog_osd_write_rec(const struct lu_env *env, LASSERT(o); LASSERT(th); + chunk_size = llh->llh_hdr.lrh_len; CDEBUG(D_OTHER, "new record %x to "DFID"\n", rec->lrh_type, PFID(lu_object_fid(&o->do_lu))); - /* record length should not bigger than LLOG_CHUNK_SIZE */ - if (reclen > LLOG_CHUNK_SIZE) + /* record length should not bigger than */ + if (reclen > loghandle->lgh_hdr->llh_hdr.lrh_len) RETURN(-E2BIG); - rc = dt_attr_get(env, o, &lgi->lgi_attr, NULL); + rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) RETURN(rc); @@ -386,7 +407,7 @@ static int llog_osd_write_rec(const struct lu_env *env, /* llog can be empty only when first record is being written */ LASSERT(ergo(idx > 0, lgi->lgi_attr.la_size > 0)); - if (!ext2_test_bit(idx, llh->llh_bitmap)) { + if (!ext2_test_bit(idx, LLOG_HDR_BITMAP(llh))) { CERROR("%s: modify unset record %u\n", o->do_lu.lo_dev->ld_obd->obd_name, idx); RETURN(-ENOENT); @@ -401,7 +422,7 @@ static int llog_osd_write_rec(const struct lu_env *env, if (idx == LLOG_HEADER_IDX) { /* llog header update */ - LASSERT(reclen == sizeof(struct llog_log_hdr)); + LASSERT(reclen >= sizeof(struct llog_log_hdr)); LASSERT(rec == &llh->llh_hdr); lgi->lgi_off = 0; @@ -472,7 +493,7 @@ static int llog_osd_write_rec(const struct lu_env *env, */ LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); lgi->lgi_off = lgi->lgi_attr.la_size; - left = LLOG_CHUNK_SIZE - (lgi->lgi_off & (LLOG_CHUNK_SIZE - 1)); + left = chunk_size - (lgi->lgi_off & (chunk_size - 1)); /* NOTE: padding is a record, but no bit is set */ if (left != 0 && left != reclen && left < (reclen + LLOG_MIN_REC_SIZE)) { @@ -483,20 +504,20 @@ static int llog_osd_write_rec(const struct lu_env *env, loghandle->lgh_last_idx++; /* for pad rec */ } /* if it's the last idx in log file, then return -ENOSPC */ - if (loghandle->lgh_last_idx >= LLOG_BITMAP_SIZE(llh) - 1) + if (loghandle->lgh_last_idx >= LLOG_HDR_BITMAP_SIZE(llh) - 1) RETURN(-ENOSPC); /* increment the last_idx along with llh_tail index, they should * be equal for a llog lifetime */ loghandle->lgh_last_idx++; index = loghandle->lgh_last_idx; - llh->llh_tail.lrt_index = index; + LLOG_HDR_TAIL(llh)->lrt_index = index; /** * NB: the caller should make sure only 1 process access * the lgh_last_idx, e.g. append should be exclusive. * Otherwise it might hit the assert. */ - LASSERT(index < LLOG_BITMAP_SIZE(llh)); + LASSERT(index < LLOG_HDR_BITMAP_SIZE(llh)); rec->lrh_index = index; lrt = rec_tail(rec); lrt->lrt_len = rec->lrh_len; @@ -504,38 +525,88 @@ static int llog_osd_write_rec(const struct lu_env *env, /* the lgh_hdr_lock protects llog header data from concurrent * update/cancel, the llh_count and llh_bitmap are protected */ - spin_lock(&loghandle->lgh_hdr_lock); - if (ext2_set_bit(index, llh->llh_bitmap)) { + down_write(&loghandle->lgh_hdr_lock); + if (ext2_set_bit(index, LLOG_HDR_BITMAP(llh))) { CERROR("%s: index %u already set in log bitmap\n", o->do_lu.lo_dev->ld_obd->obd_name, index); - spin_unlock(&loghandle->lgh_hdr_lock); + up_write(&loghandle->lgh_hdr_lock); LBUG(); /* should never happen */ } llh->llh_count++; - spin_unlock(&loghandle->lgh_hdr_lock); - lgi->lgi_off = 0; - lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len; - lgi->lgi_buf.lb_buf = &llh->llh_hdr; - rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); + /* XXX It is a bit tricky here, if the log object is local, + * we do not need lock during write here, because if there is + * race, the transaction(jbd2, what about ZFS?) will make sure the + * conflicts will all committed in the same transaction group. + * But for remote object, we need lock the whole process, so to + * set the version of the remote transaction to make sure they + * are being sent in order. (see osp_md_write()) */ + if (!dt_object_remote(o)) + up_write(&loghandle->lgh_hdr_lock); + + if (lgi->lgi_attr.la_size == 0) { + lgi->lgi_off = 0; + lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len; + lgi->lgi_buf.lb_buf = &llh->llh_hdr; + rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); + if (rc != 0) + GOTO(out_remote_unlock, rc); + } else { + /* Note: If this is not initialization (size == 0), then do not + * write the whole header (8k bytes), only update header/tail + * and bits needs to be updated. Because this update might be + * part of cross-MDT operation, which needs to write these + * updates into the update log(32KB limit) and also pack inside + * the RPC (1MB limit), if we write 8K for each operation, which + * will cost a lot space, and keep us adding more updates to one + * update log.*/ + lgi->lgi_off = offsetof(typeof(*llh), llh_count); + lgi->lgi_buf.lb_len = sizeof(llh->llh_count); + lgi->lgi_buf.lb_buf = &llh->llh_count; + rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); + if (rc != 0) + GOTO(out_remote_unlock, rc); + + lgi->lgi_off = offsetof(typeof(*llh), + llh_bitmap[index / (sizeof(*llh->llh_bitmap) * 8)]); + lgi->lgi_buf.lb_len = sizeof(*llh->llh_bitmap); + lgi->lgi_buf.lb_buf = + &llh->llh_bitmap[index/(sizeof(*llh->llh_bitmap)*8)]; + rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); + if (rc != 0) + GOTO(out_remote_unlock, rc); + + lgi->lgi_off = (unsigned long)LLOG_HDR_TAIL(llh) - + (unsigned long)llh; + lgi->lgi_buf.lb_len = sizeof(llh->llh_tail); + lgi->lgi_buf.lb_buf = LLOG_HDR_TAIL(llh); + rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); + if (rc != 0) + GOTO(out_remote_unlock, rc); + } + +out_remote_unlock: + /* unlock here for remote object */ + if (dt_object_remote(o)) + up_write(&loghandle->lgh_hdr_lock); if (rc) GOTO(out, rc); - header_is_updated = true; - rc = dt_attr_get(env, o, &lgi->lgi_attr, NULL); + rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) GOTO(out, rc); LASSERT(lgi->lgi_attr.la_valid & LA_SIZE); - lgi->lgi_off = lgi->lgi_attr.la_size; + lgi->lgi_off = max_t(__u64, lgi->lgi_attr.la_size, lgi->lgi_off); lgi->lgi_buf.lb_len = reclen; lgi->lgi_buf.lb_buf = rec; rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc < 0) GOTO(out, rc); - CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u\n", - POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len); + CDEBUG(D_OTHER, "added record "DOSTID": idx: %u, %u off"LPU64"\n", + POSTID(&loghandle->lgh_id.lgl_oi), index, rec->lrh_len, + lgi->lgi_off); if (reccookie != NULL) { reccookie->lgc_lgl = loghandle->lgh_id; reccookie->lgc_index = index; @@ -551,22 +622,14 @@ static int llog_osd_write_rec(const struct lu_env *env, RETURN(rc); out: /* cleanup llog for error case */ - spin_lock(&loghandle->lgh_hdr_lock); - ext2_clear_bit(index, llh->llh_bitmap); + down_write(&loghandle->lgh_hdr_lock); + ext2_clear_bit(index, LLOG_HDR_BITMAP(llh)); llh->llh_count--; - spin_unlock(&loghandle->lgh_hdr_lock); + up_write(&loghandle->lgh_hdr_lock); /* restore llog last_idx */ loghandle->lgh_last_idx--; - llh->llh_tail.lrt_index = loghandle->lgh_last_idx; - - /* restore the header on disk if it was written */ - if (header_is_updated) { - lgi->lgi_off = 0; - lgi->lgi_buf.lb_len = llh->llh_hdr.lrh_len; - lgi->lgi_buf.lb_buf = &llh->llh_hdr; - dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); - } + LLOG_HDR_TAIL(llh)->lrt_index = loghandle->lgh_last_idx; RETURN(rc); } @@ -578,12 +641,45 @@ out: * actual records are larger than minimum size) we just skip * some more records. */ -static inline void llog_skip_over(__u64 *off, int curr, int goal) +static inline void llog_skip_over(struct llog_log_hdr *llh, __u64 *off, + int curr, int goal, __u32 chunk_size) { - if (goal <= curr) + if (goal > curr) { + if (llh->llh_size == 0) { + /* variable size records */ + *off = (*off + (goal - curr - 1) * LLOG_MIN_REC_SIZE); + } else { + *off = chunk_size + (goal - 1) * llh->llh_size; + } + } + /* always align with lower chunk boundary*/ + *off &= ~(chunk_size - 1); +} + +/** + * Remove optional fields that the client doesn't expect. + * This is typically in order to ensure compatibility with older clients. + * It is assumed that since we exclusively remove fields, the block will be + * big enough to handle the remapped records. It is also assumed that records + * of a block have the same format (i.e.: the same features enabled). + * + * \param[in,out] hdr Header of the block of records to remap. + * \param[in,out] last_hdr Last header, don't read past this point. + * \param[in] flags Flags describing the fields to keep. + */ +static void changelog_block_trim_ext(struct llog_rec_hdr *hdr, + struct llog_rec_hdr *last_hdr, + enum changelog_rec_flags flags) +{ + if (hdr->lrh_type != CHANGELOG_REC) return; - *off = (*off + (goal - curr - 1) * LLOG_MIN_REC_SIZE) & - ~(LLOG_CHUNK_SIZE - 1); + + do { + struct changelog_rec *rec = (struct changelog_rec *)(hdr + 1); + + changelog_remap_rec(rec, rec->cr_flags & flags); + hdr = llog_rec_hdr_next(hdr); + } while ((char *)hdr <= (char *)last_hdr); } /** @@ -599,7 +695,7 @@ static inline void llog_skip_over(__u64 *off, int curr, int goal) * \param[in,out] cur_offset furtherst point read in the file * \param[in] buf pointer to data buffer to fill * \param[in] len required len to read, it is - * LLOG_CHUNK_SIZE usually. + * usually llog chunk_size. * * \retval 0 on successful buffer read * \retval negative value on error @@ -613,13 +709,15 @@ static int llog_osd_next_block(const struct lu_env *env, struct dt_object *o; struct dt_device *dt; int rc; + __u32 chunk_size; ENTRY; LASSERT(env); LASSERT(lgi); - if (len == 0 || len & (LLOG_CHUNK_SIZE - 1)) + chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len; + if (len == 0 || len & (chunk_size - 1)) RETURN(-EINVAL); CDEBUG(D_OTHER, "looking for log index %u (cur idx %u off "LPU64")\n", @@ -634,7 +732,7 @@ static int llog_osd_next_block(const struct lu_env *env, dt = lu2dt_dev(o->do_lu.lo_dev); LASSERT(dt); - rc = dt_attr_get(env, o, &lgi->lgi_attr, BYPASS_CAPA); + rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) GOTO(out, rc); @@ -642,11 +740,12 @@ static int llog_osd_next_block(const struct lu_env *env, struct llog_rec_hdr *rec, *last_rec; struct llog_rec_tail *tail; - llog_skip_over(cur_offset, *cur_idx, next_idx); + llog_skip_over(loghandle->lgh_hdr, cur_offset, *cur_idx, + next_idx, chunk_size); - /* read up to next LLOG_CHUNK_SIZE block */ - lgi->lgi_buf.lb_len = LLOG_CHUNK_SIZE - - (*cur_offset & (LLOG_CHUNK_SIZE - 1)); + /* read up to next llog chunk_size block */ + lgi->lgi_buf.lb_len = chunk_size - + (*cur_offset & (chunk_size - 1)); lgi->lgi_buf.lb_buf = buf; rc = dt_read(env, o, &lgi->lgi_buf, cur_offset); @@ -685,7 +784,7 @@ static int llog_osd_next_block(const struct lu_env *env, sizeof(struct llog_rec_tail)); /* get the last record in block */ last_rec = (struct llog_rec_hdr *)((char *)buf + rc - - le32_to_cpu(tail->lrt_len)); + tail->lrt_len); if (LLOG_REC_HDR_NEEDS_SWABBING(last_rec)) lustre_swab_llog_rec(last_rec); @@ -713,6 +812,12 @@ static int llog_osd_next_block(const struct lu_env *env, rec->lrh_index, next_idx); GOTO(out, rc = -ENOENT); } + + /* Trim unsupported extensions for compat w/ older clients */ + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID)) + changelog_block_trim_ext(rec, last_rec, + CLF_VERSION | CLF_RENAME); + GOTO(out, rc = 0); } GOTO(out, rc = -EIO); @@ -732,7 +837,7 @@ out: * \param[in] loghandle llog handle of the current llog * \param[in] prev_idx target index to find * \param[in] buf pointer to data buffer to fill - * \param[in] len required len to read, it is LLOG_CHUNK_SIZE usually. + * \param[in] len required len to read, it is llog_chunk_size usually. * * \retval 0 on successful buffer read * \retval negative value on error @@ -745,11 +850,13 @@ static int llog_osd_prev_block(const struct lu_env *env, struct dt_object *o; struct dt_device *dt; loff_t cur_offset; + __u32 chunk_size; int rc; ENTRY; - if (len == 0 || len & (LLOG_CHUNK_SIZE - 1)) + chunk_size = loghandle->lgh_hdr->llh_hdr.lrh_len; + if (len == 0 || len & (chunk_size - 1)) RETURN(-EINVAL); CDEBUG(D_OTHER, "looking for log index %u\n", prev_idx); @@ -763,10 +870,11 @@ static int llog_osd_prev_block(const struct lu_env *env, dt = lu2dt_dev(o->do_lu.lo_dev); LASSERT(dt); - cur_offset = LLOG_CHUNK_SIZE; - llog_skip_over(&cur_offset, 0, prev_idx); + cur_offset = chunk_size; + llog_skip_over(loghandle->lgh_hdr, &cur_offset, 0, prev_idx, + chunk_size); - rc = dt_attr_get(env, o, &lgi->lgi_attr, BYPASS_CAPA); + rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) GOTO(out, rc); @@ -831,6 +939,12 @@ static int llog_osd_prev_block(const struct lu_env *env, rec->lrh_index, prev_idx); GOTO(out, rc = -ENOENT); } + + /* Trim unsupported extensions for compat w/ older clients */ + if (!(loghandle->lgh_hdr->llh_flags & LLOG_F_EXT_JOBID)) + changelog_block_trim_ext(rec, last_rec, + CLF_VERSION | CLF_RENAME); + GOTO(out, rc = 0); } GOTO(out, rc = -EIO); @@ -848,8 +962,8 @@ out: * \retval dt_object of llog directory * \retval ERR_PTR of negative value on error */ -struct dt_object *llog_osd_dir_get(const struct lu_env *env, - struct llog_ctxt *ctxt) +static struct dt_object *llog_osd_dir_get(const struct lu_env *env, + struct llog_ctxt *ctxt) { struct dt_device *dt; struct dt_thread_info *dti = dt_info(env); @@ -905,7 +1019,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle, struct dt_object *o; struct dt_device *dt; struct ls_device *ls; - struct local_oid_storage *los; + struct local_oid_storage *los = NULL; int rc = 0; ENTRY; @@ -916,6 +1030,25 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle, LASSERT(ctxt->loc_exp->exp_obd); dt = ctxt->loc_exp->exp_obd->obd_lvfs_ctxt.dt; LASSERT(dt); + if (ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) { + if (logid != NULL) { + logid_to_fid(logid, &lgi->lgi_fid); + } else { + /* If logid == NULL, then it means the caller needs + * to allocate new FID (llog_cat_declare_add_rec()). */ + rc = obd_fid_alloc(env, ctxt->loc_exp, + &lgi->lgi_fid, NULL); + if (rc < 0) + RETURN(rc); + rc = 0; + } + + o = dt_locate(env, dt, &lgi->lgi_fid); + if (IS_ERR(o)) + RETURN(PTR_ERR(o)); + + goto after_open; + } ls = ls_device_get(dt); if (IS_ERR(ls)) @@ -965,6 +1098,7 @@ static int llog_osd_open(const struct lu_env *env, struct llog_handle *handle, if (IS_ERR(o)) GOTO(out_name, rc = PTR_ERR(o)); +after_open: /* No new llog is expected but doesn't exist */ if (open_param != LLOG_OPEN_NEW && !dt_object_exists(o)) GOTO(out_put, rc = -ENOENT); @@ -982,7 +1116,8 @@ out_name: if (handle->lgh_name != NULL) OBD_FREE(handle->lgh_name, strlen(name) + 1); out: - dt_los_put(los); + if (los != NULL) + dt_los_put(los); RETURN(rc); } @@ -1004,6 +1139,103 @@ static int llog_osd_exist(struct llog_handle *handle) } /** + * Get dir for regular fid log object + * + * Get directory for regular fid log object, and these regular fid log + * object will be inserted under this directory, to satisfy the FS + * consistency check, e2fsck etc. + * + * \param [in] env execution environment + * \param [in] dto llog object + * + * \retval pointer to the directory if it is found. + * \retval ERR_PTR(negative errno) if it fails. + */ +struct dt_object *llog_osd_get_regular_fid_dir(const struct lu_env *env, + struct dt_object *dto) +{ + struct llog_thread_info *lgi = llog_info(env); + struct seq_server_site *ss = dto->do_lu.lo_dev->ld_site->ld_seq_site; + struct lu_seq_range *range = &lgi->lgi_range; + struct lu_fid *dir_fid = &lgi->lgi_fid; + struct dt_object *dir; + int rc; + ENTRY; + + fld_range_set_any(range); + LASSERT(ss != NULL); + rc = ss->ss_server_fld->lsf_seq_lookup(env, ss->ss_server_fld, + fid_seq(lu_object_fid(&dto->do_lu)), range); + if (rc < 0) + RETURN(ERR_PTR(rc)); + + lu_update_log_dir_fid(dir_fid, range->lsr_index); + dir = dt_locate(env, lu2dt_dev(dto->do_lu.lo_dev), dir_fid); + if (IS_ERR(dir)) + RETURN(dir); + + if (!dt_try_as_dir(env, dir)) { + lu_object_put(env, &dir->do_lu); + RETURN(ERR_PTR(-ENOTDIR)); + } + + RETURN(dir); +} + +/** + * Add llog object with regular FID to name entry + * + * Add llog object with regular FID to name space, and each llog + * object on each MDT will be /update_log_dir/[seq:oid:ver], + * so to satisfy the namespace consistency check, e2fsck etc. + * + * \param [in] env execution environment + * \param [in] dto llog object + * \param [in] th thandle + * \param [in] declare if it is declare or execution + * + * \retval 0 if insertion succeeds. + * \retval negative errno if insertion fails. + */ +static int +llog_osd_regular_fid_add_name_entry(const struct lu_env *env, + struct dt_object *dto, + struct thandle *th, bool declare) +{ + struct llog_thread_info *lgi = llog_info(env); + const struct lu_fid *fid = lu_object_fid(&dto->do_lu); + struct dt_insert_rec *rec = &lgi->lgi_dt_rec; + struct dt_object *dir; + char *name = lgi->lgi_name; + int rc; + ENTRY; + + if (!fid_is_norm(fid)) + RETURN(0); + + dir = llog_osd_get_regular_fid_dir(env, dto); + if (IS_ERR(dir)) + RETURN(PTR_ERR(dir)); + + rec->rec_fid = fid; + rec->rec_type = S_IFREG; + snprintf(name, sizeof(lgi->lgi_name), DFID, PFID(fid)); + dt_write_lock(env, dir, 0); + if (declare) { + rc = dt_declare_insert(env, dir, (struct dt_rec *)rec, + (struct dt_key *)name, th); + } else { + rc = dt_insert(env, dir, (struct dt_rec *)rec, + (struct dt_key *)name, th, 1); + } + dt_write_unlock(env, dir); + + lu_object_put(env, &dir->do_lu); + RETURN(rc); +} + + +/** * Implementation of the llog_operations::lop_declare_create * * This function declares the llog create. It declares also name insert @@ -1035,6 +1267,24 @@ static int llog_osd_declare_create(const struct lu_env *env, if (dt_object_exists(o)) RETURN(0); + if (res->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) { + struct llog_thread_info *lgi = llog_info(env); + + lgi->lgi_attr.la_valid = LA_MODE | LA_SIZE; + lgi->lgi_attr.la_size = 0; + lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR; + lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG); + + rc = dt_declare_create(env, o, &lgi->lgi_attr, NULL, + &lgi->lgi_dof, th); + if (rc < 0) + RETURN(rc); + + + rc = llog_osd_regular_fid_add_name_entry(env, o, th, true); + + RETURN(rc); + } los = res->private_data; LASSERT(los); @@ -1098,6 +1348,26 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res, if (dt_object_exists(o)) RETURN(-EEXIST); + if (res->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) { + struct llog_thread_info *lgi = llog_info(env); + + lgi->lgi_attr.la_valid = LA_MODE | LA_SIZE | LA_TYPE; + lgi->lgi_attr.la_size = 0; + lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR; + lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG); + + dt_write_lock(env, o, 0); + rc = dt_create(env, o, &lgi->lgi_attr, NULL, + &lgi->lgi_dof, th); + dt_write_unlock(env, o); + if (rc < 0) + RETURN(rc); + + rc = llog_osd_regular_fid_add_name_entry(env, o, th, false); + + RETURN(rc); + } + los = res->private_data; LASSERT(los); @@ -1124,7 +1394,7 @@ static int llog_osd_create(const struct lu_env *env, struct llog_handle *res, dt_read_lock(env, llog_dir, 0); rc = dt_insert(env, llog_dir, (struct dt_rec *)rec, (struct dt_key *)res->lgh_name, - th, BYPASS_CAPA, 1); + th, 1); dt_read_unlock(env, llog_dir); lu_object_put(env, &llog_dir->do_lu); if (rc) @@ -1156,8 +1426,15 @@ static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle) LASSERT(handle->lgh_obj); - lu_object_put(env, &handle->lgh_obj->do_lu); - + if (handle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) { + /* Remove the object from the cache, otherwise it may + * hold LOD being released during cleanup process */ + lu_object_put_nocache(env, &handle->lgh_obj->do_lu); + LASSERT(handle->private_data == NULL); + RETURN(rc); + } else { + lu_object_put(env, &handle->lgh_obj->do_lu); + } los = handle->private_data; LASSERT(los); dt_los_put(los); @@ -1169,6 +1446,54 @@ static int llog_osd_close(const struct lu_env *env, struct llog_handle *handle) } /** + * delete llog object name entry + * + * Delete llog object (with regular FID) from name space (under + * update_log_dir). + * + * \param [in] env execution environment + * \param [in] dto llog object + * \param [in] th thandle + * \param [in] declare if it is declare or execution + * + * \retval 0 if deletion succeeds. + * \retval negative errno if deletion fails. + */ +static int +llog_osd_regular_fid_del_name_entry(const struct lu_env *env, + struct dt_object *dto, + struct thandle *th, bool declare) +{ + struct llog_thread_info *lgi = llog_info(env); + const struct lu_fid *fid = lu_object_fid(&dto->do_lu); + struct dt_object *dir; + char *name = lgi->lgi_name; + int rc; + ENTRY; + + if (!fid_is_norm(fid)) + RETURN(0); + + dir = llog_osd_get_regular_fid_dir(env, dto); + if (IS_ERR(dir)) + RETURN(PTR_ERR(dir)); + + snprintf(name, sizeof(lgi->lgi_name), DFID, PFID(fid)); + dt_write_lock(env, dir, 0); + if (declare) { + rc = dt_declare_delete(env, dir, (struct dt_key *)name, + th); + } else { + rc = dt_delete(env, dir, (struct dt_key *)name, th); + } + dt_write_unlock(env, dir); + + lu_object_put(env, &dir->do_lu); + RETURN(rc); +} + + +/** * Implementation of the llog_operations::lop_destroy * * This function destroys the llog and deletes also entry in the @@ -1220,23 +1545,33 @@ static int llog_osd_destroy(const struct lu_env *env, GOTO(out_trans, rc); } - dt_declare_ref_del(env, o, th); + rc = dt_declare_ref_del(env, o, th); + if (rc < 0) + GOTO(out_trans, rc); rc = dt_declare_destroy(env, o, th); if (rc) GOTO(out_trans, rc); + if (loghandle->lgh_ctxt->loc_flags & LLOG_CTXT_FLAG_NORMAL_FID) { + rc = llog_osd_regular_fid_del_name_entry(env, o, th, true); + if (rc < 0) + GOTO(out_trans, rc); + } + rc = dt_trans_start_local(env, d, th); if (rc) GOTO(out_trans, rc); + th->th_wait_submit = 1; + dt_write_lock(env, o, 0); if (dt_object_exists(o)) { if (name) { dt_read_lock(env, llog_dir, 0); rc = dt_delete(env, llog_dir, (struct dt_key *) name, - th, BYPASS_CAPA); + th); dt_read_unlock(env, llog_dir); if (rc) { CERROR("%s: can't remove llog %s: rc = %d\n", @@ -1249,12 +1584,20 @@ static int llog_osd_destroy(const struct lu_env *env, rc = dt_destroy(env, o, th); if (rc) GOTO(out_unlock, rc); + + if (loghandle->lgh_ctxt->loc_flags & + LLOG_CTXT_FLAG_NORMAL_FID) { + rc = llog_osd_regular_fid_del_name_entry(env, o, th, + false); + if (rc < 0) + GOTO(out_unlock, rc); + } } out_unlock: dt_write_unlock(env, o); out_trans: dt_trans_stop(env, d, th); - if (llog_dir != NULL) + if (!(IS_ERR_OR_NULL(llog_dir))) lu_object_put(env, &llog_dir->do_lu); RETURN(rc); } @@ -1289,6 +1632,9 @@ static int llog_osd_setup(const struct lu_env *env, struct obd_device *obd, ctxt = llog_ctxt_get(olg->olg_ctxts[ctxt_idx]); LASSERT(ctxt); + if (disk_obd == NULL) + GOTO(out, rc = 0); + /* initialize data allowing to generate new fids, * literally we need a sequece */ lgi->lgi_fid.f_seq = FID_SEQ_LLOG; @@ -1360,6 +1706,25 @@ struct llog_operations llog_osd_ops = { }; EXPORT_SYMBOL(llog_osd_ops); +struct llog_operations llog_common_cat_ops = { + .lop_next_block = llog_osd_next_block, + .lop_prev_block = llog_osd_prev_block, + .lop_read_header = llog_osd_read_header, + .lop_destroy = llog_osd_destroy, + .lop_setup = llog_osd_setup, + .lop_cleanup = llog_osd_cleanup, + .lop_open = llog_osd_open, + .lop_exist = llog_osd_exist, + .lop_declare_create = llog_osd_declare_create, + .lop_create = llog_osd_create, + .lop_declare_write_rec = llog_osd_declare_write_rec, + .lop_write_rec = llog_osd_write_rec, + .lop_close = llog_osd_close, + .lop_add = llog_cat_add_rec, + .lop_declare_add = llog_cat_declare_add_rec, +}; +EXPORT_SYMBOL(llog_common_cat_ops); + /** * Read the special file which contains the list of llog catalogs IDs * @@ -1411,6 +1776,12 @@ int llog_osd_get_cat_list(const struct lu_env *env, struct dt_device *d, lgi->lgi_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR; lgi->lgi_dof.dof_type = dt_mode_to_dft(S_IFREG); + th->th_wait_submit = 1; + /* Make the llog object creation synchronization, so + * it will be reliable to the reference, especially + * for remote reference */ + th->th_sync = 1; + rc = dt_declare_create(env, o, &lgi->lgi_attr, NULL, &lgi->lgi_dof, th); if (rc) @@ -1431,7 +1802,7 @@ out_trans: GOTO(out, rc); } - rc = dt_attr_get(env, o, &lgi->lgi_attr, BYPASS_CAPA); + rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) GOTO(out, rc); @@ -1461,7 +1832,9 @@ out_trans: lgi->lgi_buf.lb_buf = idarray; lgi->lgi_buf.lb_len = size; rc = dt_record_read(env, o, &lgi->lgi_buf, &lgi->lgi_off); - if (rc) { + /* -EFAULT means the llog is a sparse file. This is not an error + * after arbitrary OST index is supported. */ + if (rc < 0 && rc != -EFAULT) { CERROR("%s: error reading CATALOGS: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, rc); GOTO(out, rc); @@ -1516,7 +1889,7 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d, if (!dt_object_exists(o)) GOTO(out, rc = -ENOENT); - rc = dt_attr_get(env, o, &lgi->lgi_attr, BYPASS_CAPA); + rc = dt_attr_get(env, o, &lgi->lgi_attr); if (rc) GOTO(out, rc); @@ -1541,6 +1914,8 @@ int llog_osd_put_cat_list(const struct lu_env *env, struct dt_device *d, if (rc) GOTO(out_trans, rc); + th->th_wait_submit = 1; + rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th); if (rc) CDEBUG(D_INODE, "can't write CATALOGS at index %d: rc = %d\n",