ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
LASSERT(ctxt != NULL);
ctxt->loc_flags |= LLOG_CTXT_FLAG_NORMAL_FID;
-
+ ctxt->loc_chunk_size = LLOG_MIN_CHUNK_SIZE * 4;
if (likely(logid_id(&cid->lci_logid) != 0)) {
rc = llog_open(env, ctxt, &lgh, &cid->lci_logid, NULL,
LLOG_OPEN_EXISTS);
struct llog_thread_info *lgi;
enum llog_flag flags;
int rc;
- __u32 max_size = handle->lgh_hdr_size;
ENTRY;
lgi->lgi_off = 0;
lgi->lgi_buf.lb_buf = handle->lgh_hdr;
- lgi->lgi_buf.lb_len = max_size;
- rc = dt_record_read(env, o, &lgi->lgi_buf, &lgi->lgi_off);
- if (rc) {
- CERROR("%s: error reading log header from "DFID": rc = %d\n",
+ lgi->lgi_buf.lb_len = handle->lgh_hdr_size;
+ rc = dt_read(env, o, &lgi->lgi_buf, &lgi->lgi_off);
+ llh_hdr = &handle->lgh_hdr->llh_hdr;
+ if (rc < sizeof(*llh_hdr) || rc < llh_hdr->lrh_len) {
+ CERROR("%s: error reading "DFID" log header size %d: rc = %d\n",
o->do_lu.lo_dev->ld_obd->obd_name,
- PFID(lu_object_fid(&o->do_lu)), rc);
+ PFID(lu_object_fid(&o->do_lu)), rc < 0 ? 0 : rc,
+ -EFAULT);
+
+ if (rc >= 0)
+ rc = -EFAULT;
+
RETURN(rc);
}
- llh_hdr = &handle->lgh_hdr->llh_hdr;
if (LLOG_REC_HDR_NEEDS_SWABBING(llh_hdr))
lustre_swab_llog_hdr(handle->lgh_hdr);
llh_hdr->lrh_type, LLOG_HDR_MAGIC);
RETURN(-EIO);
} else if (llh_hdr->lrh_len < LLOG_MIN_CHUNK_SIZE ||
- llh_hdr->lrh_len > max_size) {
+ llh_hdr->lrh_len > handle->lgh_hdr_size) {
CERROR("%s: incorrectly sized log %s "DFID" header: "
"%#x (expected at least %#x)\n"
"you may need to re-run lconf --write_conf.\n",
PFID(lu_object_fid(&o->do_lu)),
llh_hdr->lrh_len, LLOG_MIN_CHUNK_SIZE);
RETURN(-EIO);
+ } else if (LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index >
+ LLOG_HDR_BITMAP_SIZE(handle->lgh_hdr) ||
+ LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len !=
+ llh_hdr->lrh_len) {
+ CERROR("%s: incorrectly sized log %s "DFID" tailer: "
+ "%#x : rc = %d\n",
+ o->do_lu.lo_dev->ld_obd->obd_name,
+ handle->lgh_name ? handle->lgh_name : "",
+ PFID(lu_object_fid(&o->do_lu)),
+ LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len, -EIO);
+ RETURN(-EIO);
}
handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK);
int idx, struct thandle *th)
{
struct llog_thread_info *lgi = llog_info(env);
+ __u32 chunk_size;
struct dt_object *o;
int rc;
o = loghandle->lgh_obj;
LASSERT(o);
- lgi->lgi_buf.lb_len = sizeof(struct llog_log_hdr);
+ chunk_size = loghandle->lgh_ctxt->loc_chunk_size;
+ lgi->lgi_buf.lb_len = chunk_size;
lgi->lgi_buf.lb_buf = NULL;
/* each time we update header */
rc = dt_declare_record_write(env, o, &lgi->lgi_buf, 0,
* the pad record can be inserted so take into account double
* record size
*/
- lgi->lgi_buf.lb_len = rec->lrh_len * 2;
+ lgi->lgi_buf.lb_len = chunk_size * 2;
lgi->lgi_buf.lb_buf = NULL;
/* XXX: implement declared window or multi-chunks approach */
rc = dt_declare_record_write(env, o, &lgi->lgi_buf, -1, th);
if (rc != 0)
GOTO(out_remote_unlock, rc);
- lgi->lgi_off = offsetof(typeof(*llh), llh_tail);
+ lgi->lgi_off = (unsigned long)LLOG_HDR_TAIL(llh) -
+ (unsigned long)llh;
lgi->lgi_buf.lb_len = sizeof(llh->llh_tail);
- lgi->lgi_buf.lb_buf = &llh->llh_tail;
+ lgi->lgi_buf.lb_buf = LLOG_HDR_TAIL(llh);
rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
if (rc != 0)
GOTO(out_remote_unlock, rc);
struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev);
struct dt_device *dt_dev = &osp->opd_dt_dev;
struct lu_buf *lbuf = &osp_env_info(env)->osi_lb2;
- struct osp_update_request *update;
+ struct osp_update_request *update = NULL;
struct object_update_reply *reply;
struct out_read_reply *orr;
+ char *ptr = rbuf->lb_buf;
struct ptlrpc_request *req = NULL;
+ size_t total_length = rbuf->lb_len;
+ size_t max_buf_size;
+ loff_t offset = *pos;
int rc;
ENTRY;
- /* Because it needs send the update buffer right away,
- * just create an update buffer, instead of attaching the
- * update_remote list of the thandle. */
- update = osp_update_request_create(dt_dev);
- if (IS_ERR(update))
- RETURN(PTR_ERR(update));
+ /* Calculate the maxium buffer length for each read request */
+ max_buf_size = OUT_UPDATE_REPLY_SIZE - cfs_size_round(sizeof(*orr)) -
+ cfs_size_round(sizeof(struct object_update_result)) -
+ cfs_size_round(offsetof(struct object_update_reply,
+ ourp_lens[1]));
+ while (total_length > 0) {
+ size_t read_length;
+
+ /* Because it needs send the update buffer right away,
+ * just create an update buffer, instead of attaching the
+ * update_remote list of the thandle. */
+ update = osp_update_request_create(dt_dev);
+ if (IS_ERR(update))
+ GOTO(out, rc = PTR_ERR(update));
+
+ read_length = total_length > max_buf_size ?
+ max_buf_size : total_length;
+
+ rc = osp_update_rpc_pack(env, read, update, OUT_READ,
+ lu_object_fid(&dt->do_lu),
+ read_length, offset);
+ if (rc != 0) {
+ CERROR("%s: cannot insert update: rc = %d\n",
+ dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
+ GOTO(out, rc);
+ }
- rc = osp_update_rpc_pack(env, read, update, OUT_READ,
- lu_object_fid(&dt->do_lu), rbuf->lb_len, *pos);
- if (rc != 0) {
- CERROR("%s: cannot insert update: rc = %d\n",
- dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
- GOTO(out, rc);
- }
+ rc = osp_remote_sync(env, osp, update, &req);
+ if (rc < 0)
+ GOTO(out, rc);
- rc = osp_remote_sync(env, osp, update, &req);
- if (rc < 0)
- GOTO(out, rc);
+ reply = req_capsule_server_sized_get(&req->rq_pill,
+ &RMF_OUT_UPDATE_REPLY,
+ OUT_UPDATE_REPLY_SIZE);
- reply = req_capsule_server_sized_get(&req->rq_pill,
- &RMF_OUT_UPDATE_REPLY,
- OUT_UPDATE_REPLY_SIZE);
- if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
- CERROR("%s: invalid update reply magic %x expected %x:"
- " rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name,
- reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO);
- GOTO(out, rc = -EPROTO);
- }
+ if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
+ CERROR("%s: invalid update reply magic %x expected %x:"
+ " rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name,
+ reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO);
+ GOTO(out, rc = -EPROTO);
+ }
- rc = object_update_result_data_get(reply, lbuf, 0);
- if (rc < 0)
- GOTO(out, rc);
+ rc = object_update_result_data_get(reply, lbuf, 0);
+ if (rc < 0)
+ GOTO(out, rc);
- if (lbuf->lb_len < sizeof(*orr))
- GOTO(out, rc = -EPROTO);
+ if (lbuf->lb_len < sizeof(*orr))
+ GOTO(out, rc = -EPROTO);
- orr = lbuf->lb_buf;
- orr_le_to_cpu(orr, orr);
+ orr = lbuf->lb_buf;
+ orr_le_to_cpu(orr, orr);
+ offset = orr->orr_offset;
+ if (orr->orr_size > max_buf_size)
+ GOTO(out, rc = -EPROTO);
- *pos = orr->orr_offset;
+ memcpy(ptr, orr->orr_data, orr->orr_size);
+ ptr += orr->orr_size;
+ total_length -= orr->orr_size;
- if (orr->orr_size > rbuf->lb_len)
- GOTO(out, rc = -EPROTO);
+ CDEBUG(D_INFO, "%s: read "DFID" pos "LPU64" len %u left %zu\n",
+ osp->opd_obd->obd_name, PFID(lu_object_fid(&dt->do_lu)),
+ offset, orr->orr_size, total_length);
- memcpy(rbuf->lb_buf, orr->orr_data, orr->orr_size);
+ if (orr->orr_size < read_length)
+ break;
- CDEBUG(D_INFO, "%s: read "DFID" pos "LPU64" len %u\n",
+ ptlrpc_req_finished(req);
+ osp_update_request_destroy(update);
+ req = NULL;
+ update = NULL;
+ }
+
+ total_length = rbuf->lb_len - total_length;
+ *pos = offset;
+ CDEBUG(D_INFO, "%s: total read "DFID" pos "LPU64" len %zu\n",
osp->opd_obd->obd_name, PFID(lu_object_fid(&dt->do_lu)),
- *pos, orr->orr_size);
- GOTO(out, rc = (int)orr->orr_size);
+ *pos, total_length);
+ GOTO(out, rc = (int)total_length);
out:
if (req != NULL)
ptlrpc_req_finished(req);
- osp_update_request_destroy(update);
+ if (update != NULL)
+ osp_update_request_destroy(update);
return rc;
}
}
pos = le64_to_cpu(*(__u64 *)(tmp));
+ /* Check if the read buffer can hold the read_size */
if (size > OUT_UPDATE_REPLY_SIZE -
- cfs_size_round((unsigned long)update_result->our_data -
- (unsigned long)update_result) - sizeof(pos)) {
+ cfs_size_round(offsetof(struct object_update_reply,
+ ourp_lens[1])) -
+ cfs_size_round(sizeof(*update_result)) -
+ cfs_size_round(sizeof(*orr))) {
CERROR("%s: get %zu the biggest read size is %d: rc = %d\n",
tgt_name(tsi->tsi_tgt), size, OUT_UPDATE_REPLY_SIZE,
-EPROTO);
tti->tti_u.update.tti_update_reply = reply;
tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
- /* Walk through updates in the request to execute them synchronously */
+ /* validate the request and calculate the total update count and
+ * set it to reply */
for (i = 0; i < update_buf_count; i++) {
- struct tgt_handler *h;
- struct dt_object *dt_obj;
- int update_count;
struct object_update_request *our;
- int j;
+ int update_count;
our = update_bufs[i];
if (ptlrpc_req_need_swab(pill->rc_req))
" expect %x: rc = %d\n",
tgt_name(tsi->tsi_tgt), our->ourq_magic,
UPDATE_REQUEST_MAGIC, -EPROTO);
- GOTO(out, rc = -EPROTO);
+ GOTO(out_free, rc = -EPROTO);
}
+ update_count = our->ourq_count;
+ reply->ourp_count += update_count;
+ }
+
+ /* Walk through updates in the request to execute them */
+ for (i = 0; i < update_buf_count; i++) {
+ struct tgt_handler *h;
+ struct dt_object *dt_obj;
+ int update_count;
+ struct object_update_request *our;
+ int j;
+ our = update_bufs[i];
update_count = our->ourq_count;
- reply->ourp_count += update_count;
for (j = 0; j < update_count; j++) {
update = object_update_request_get(our, j, NULL);
if (update == NULL)