From: Mikhail Pershin Date: Thu, 18 Oct 2012 19:01:49 +0000 (+0400) Subject: LU-2129 llog: protect llog write against concurrent read X-Git-Tag: 2.3.56~1 X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=commitdiff_plain;h=6c37a25e0084143a2c5e00f765a084306df90167 LU-2129 llog: protect llog write against concurrent read llog_write_rec contains case when buffer passed separately from header and tail, resulting 3 writes instead of single one, so concurrent read may read partial record. Another similar case is llog_pad, which writes header and tail with required gap to align records at the end of 8K buffer. Use dt_read/write locks on llog object to protect writes against read. Such '3-writes' case happens only with configs llogs. Signed-off-by: Mikhail Pershin Change-Id: Idfb94706b2cf8ad22776c1a277b862863f94c2f6 Reviewed-on: http://review.whamcloud.com/4303 Reviewed-by: Andreas Dilger Tested-by: Hudson Tested-by: Maloo Reviewed-by: Alex Zhuravlev --- diff --git a/lustre/obdclass/llog_osd.c b/lustre/obdclass/llog_osd.c index 37889f5..80904c5 100644 --- a/lustre/obdclass/llog_osd.c +++ b/lustre/obdclass/llog_osd.c @@ -109,11 +109,12 @@ static int llog_osd_pad(const struct lu_env *env, struct dt_object *o, lgi->lgi_buf.lb_buf = &lgi->lgi_lrh; lgi->lgi_buf.lb_len = sizeof(lgi->lgi_lrh); + dt_write_lock(env, o, 0); rc = dt_record_write(env, o, &lgi->lgi_buf, off, th); if (rc) { CERROR("%s: error writing padding record: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, rc); - RETURN(rc); + GOTO(out, rc); } lgi->lgi_buf.lb_buf = &lgi->lgi_tail; @@ -123,7 +124,8 @@ static int llog_osd_pad(const struct lu_env *env, struct dt_object *o, if (rc) CERROR("%s: error writing padding record: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, rc); - +out: + dt_write_unlock(env, o); RETURN(rc); } @@ -157,6 +159,8 @@ static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o, } /* the buf case */ + /* protect the following 3 writes from concurrent read */ + dt_write_lock(env, o, 0); rec->lrh_len = sizeof(*rec) + buflen + sizeof(lgi->lgi_tail); lgi->lgi_buf.lb_len = sizeof(*rec); lgi->lgi_buf.lb_buf = rec; @@ -185,6 +189,7 @@ static int llog_osd_write_blob(const struct lu_env *env, struct dt_object *o, CERROR("%s: error writing log tail: rc = %d\n", o->do_lu.lo_dev->ld_obd->obd_name, rc); out: + dt_write_unlock(env, o); RETURN(rc); } @@ -536,7 +541,19 @@ static int llog_osd_next_block(const struct lu_env *env, lgi->lgi_buf.lb_len = LLOG_CHUNK_SIZE - (*cur_offset & (LLOG_CHUNK_SIZE - 1)); lgi->lgi_buf.lb_buf = buf; + + /* Note: read lock is not needed around la_size get above at + * the time of dt_attr_get(). There are only two cases that + * matter. Either la_size == cur_offset, in which case the + * entire read is skipped, or la_size > cur_offset and the loop + * is entered and this thread is blocked at dt_read_lock() + * until the write is completed. When the write completes, then + * the dt_read() will be done with the full length, and will + * get the full data. + */ + dt_read_lock(env, o, 0); rc = dt_read(env, o, &lgi->lgi_buf, cur_offset); + dt_read_unlock(env, o); if (rc < 0) { CERROR("%s: can't read llog block from log "DFID " offset "LPU64": rc = %d\n", @@ -646,7 +663,12 @@ static int llog_osd_prev_block(const struct lu_env *env, lgi->lgi_buf.lb_len = len; lgi->lgi_buf.lb_buf = buf; + /* It is OK to have locking around dt_read() only, see + * comment in llog_osd_next_block for details + */ + dt_read_lock(env, o, 0); rc = dt_read(env, o, &lgi->lgi_buf, &cur_offset); + dt_read_unlock(env, o); if (rc < 0) { CERROR("%s: can't read llog block from log "DFID " offset "LPU64": rc = %d\n",