Whamcloud - gitweb
LU-6602 llog: increase update llog chunk size 61/15161/11
authorwang di <di.wang@intel.com>
Fri, 19 Jun 2015 23:49:05 +0000 (16:49 -0700)
committerOleg Drokin <oleg.drokin@intel.com>
Fri, 3 Jul 2015 15:25:58 +0000 (15:25 +0000)
Increase DNE update llog chunk size from 8KB to 32KB, so
one cross-MDT operation can includes more update records,
and it can create striped directory with more stripe count.

Signed-off-by: wang di <di.wang@intel.com>
Change-Id: I01d1435f1249c3048ac0a412bdb53f710e6e98b3
Reviewed-on: http://review.whamcloud.com/15161
Tested-by: Jenkins
Reviewed-by: Andreas Dilger <andreas.dilger@intel.com>
Reviewed-by: Lai Siyao <lai.siyao@intel.com>
Tested-by: Maloo <hpdd-maloo@intel.com>
Reviewed-by: James Simmons <uja.ornl@yahoo.com>
Reviewed-by: Oleg Drokin <oleg.drokin@intel.com>
lustre/lod/lod_sub_object.c
lustre/obdclass/llog_osd.c
lustre/osp/osp_md_object.c
lustre/target/out_handler.c

index f1df917..e86175e 100644 (file)
@@ -903,7 +903,7 @@ int lod_sub_prep_llog(const struct lu_env *env, struct lod_device *lod,
        ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
        LASSERT(ctxt != NULL);
        ctxt->loc_flags |= LLOG_CTXT_FLAG_NORMAL_FID;
-
+       ctxt->loc_chunk_size = LLOG_MIN_CHUNK_SIZE * 4;
        if (likely(logid_id(&cid->lci_logid) != 0)) {
                rc = llog_open(env, ctxt, &lgh, &cid->lci_logid, NULL,
                               LLOG_OPEN_EXISTS);
index 13bd82c..fb36839 100644 (file)
@@ -189,7 +189,6 @@ static int llog_osd_read_header(const struct lu_env *env,
        struct llog_thread_info *lgi;
        enum llog_flag           flags;
        int                      rc;
-       __u32                   max_size = handle->lgh_hdr_size;
 
        ENTRY;
 
@@ -213,16 +212,21 @@ static int llog_osd_read_header(const struct lu_env *env,
 
        lgi->lgi_off = 0;
        lgi->lgi_buf.lb_buf = handle->lgh_hdr;
-       lgi->lgi_buf.lb_len = max_size;
-       rc = dt_record_read(env, o, &lgi->lgi_buf, &lgi->lgi_off);
-       if (rc) {
-               CERROR("%s: error reading log header from "DFID": rc = %d\n",
+       lgi->lgi_buf.lb_len = handle->lgh_hdr_size;
+       rc = dt_read(env, o, &lgi->lgi_buf, &lgi->lgi_off);
+       llh_hdr = &handle->lgh_hdr->llh_hdr;
+       if (rc < sizeof(*llh_hdr) || rc < llh_hdr->lrh_len) {
+               CERROR("%s: error reading "DFID" log header size %d: rc = %d\n",
                       o->do_lu.lo_dev->ld_obd->obd_name,
-                      PFID(lu_object_fid(&o->do_lu)), rc);
+                      PFID(lu_object_fid(&o->do_lu)), rc < 0 ? 0 : rc,
+                      -EFAULT);
+
+               if (rc >= 0)
+                       rc = -EFAULT;
+
                RETURN(rc);
        }
 
-       llh_hdr = &handle->lgh_hdr->llh_hdr;
        if (LLOG_REC_HDR_NEEDS_SWABBING(llh_hdr))
                lustre_swab_llog_hdr(handle->lgh_hdr);
 
@@ -234,7 +238,7 @@ static int llog_osd_read_header(const struct lu_env *env,
                       llh_hdr->lrh_type, LLOG_HDR_MAGIC);
                RETURN(-EIO);
        } else if (llh_hdr->lrh_len < LLOG_MIN_CHUNK_SIZE ||
-                  llh_hdr->lrh_len > max_size) {
+                  llh_hdr->lrh_len > handle->lgh_hdr_size) {
                CERROR("%s: incorrectly sized log %s "DFID" header: "
                       "%#x (expected at least %#x)\n"
                       "you may need to re-run lconf --write_conf.\n",
@@ -243,6 +247,17 @@ static int llog_osd_read_header(const struct lu_env *env,
                       PFID(lu_object_fid(&o->do_lu)),
                       llh_hdr->lrh_len, LLOG_MIN_CHUNK_SIZE);
                RETURN(-EIO);
+       } else if (LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_index >
+                  LLOG_HDR_BITMAP_SIZE(handle->lgh_hdr) ||
+                  LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len !=
+                       llh_hdr->lrh_len) {
+               CERROR("%s: incorrectly sized log %s "DFID" tailer: "
+                      "%#x : rc = %d\n",
+                      o->do_lu.lo_dev->ld_obd->obd_name,
+                      handle->lgh_name ? handle->lgh_name : "",
+                      PFID(lu_object_fid(&o->do_lu)),
+                      LLOG_HDR_TAIL(handle->lgh_hdr)->lrt_len, -EIO);
+               RETURN(-EIO);
        }
 
        handle->lgh_hdr->llh_flags |= (flags & LLOG_F_EXT_MASK);
@@ -276,6 +291,7 @@ static int llog_osd_declare_write_rec(const struct lu_env *env,
                                      int idx, struct thandle *th)
 {
        struct llog_thread_info *lgi = llog_info(env);
+       __u32                   chunk_size;
        struct dt_object        *o;
        int                      rc;
 
@@ -290,7 +306,8 @@ static int llog_osd_declare_write_rec(const struct lu_env *env,
        o = loghandle->lgh_obj;
        LASSERT(o);
 
-       lgi->lgi_buf.lb_len = sizeof(struct llog_log_hdr);
+       chunk_size = loghandle->lgh_ctxt->loc_chunk_size;
+       lgi->lgi_buf.lb_len = chunk_size;
        lgi->lgi_buf.lb_buf = NULL;
        /* each time we update header */
        rc = dt_declare_record_write(env, o, &lgi->lgi_buf, 0,
@@ -302,7 +319,7 @@ static int llog_osd_declare_write_rec(const struct lu_env *env,
         * the pad record can be inserted so take into account double
         * record size
         */
-       lgi->lgi_buf.lb_len = rec->lrh_len * 2;
+       lgi->lgi_buf.lb_len = chunk_size * 2;
        lgi->lgi_buf.lb_buf = NULL;
        /* XXX: implement declared window or multi-chunks approach */
        rc = dt_declare_record_write(env, o, &lgi->lgi_buf, -1, th);
@@ -559,9 +576,10 @@ static int llog_osd_write_rec(const struct lu_env *env,
                if (rc != 0)
                        GOTO(out_remote_unlock, rc);
 
-               lgi->lgi_off = offsetof(typeof(*llh), llh_tail);
+               lgi->lgi_off =  (unsigned long)LLOG_HDR_TAIL(llh) -
+                               (unsigned long)llh;
                lgi->lgi_buf.lb_len = sizeof(llh->llh_tail);
-               lgi->lgi_buf.lb_buf = &llh->llh_tail;
+               lgi->lgi_buf.lb_buf = LLOG_HDR_TAIL(llh);
                rc = dt_record_write(env, o, &lgi->lgi_buf, &lgi->lgi_off, th);
                if (rc != 0)
                        GOTO(out_remote_unlock, rc);
index f84ffba..6738763 100644 (file)
@@ -1133,68 +1133,101 @@ static ssize_t osp_md_read(const struct lu_env *env, struct dt_object *dt,
        struct osp_device       *osp    = lu2osp_dev(dt->do_lu.lo_dev);
        struct dt_device        *dt_dev = &osp->opd_dt_dev;
        struct lu_buf           *lbuf   = &osp_env_info(env)->osi_lb2;
-       struct osp_update_request   *update;
+       struct osp_update_request   *update = NULL;
        struct object_update_reply *reply;
        struct out_read_reply      *orr;
+       char                       *ptr = rbuf->lb_buf;
        struct ptlrpc_request      *req = NULL;
+       size_t                     total_length = rbuf->lb_len;
+       size_t                     max_buf_size;
+       loff_t                     offset = *pos;
        int                        rc;
        ENTRY;
 
-       /* Because it needs send the update buffer right away,
-        * just create an update buffer, instead of attaching the
-        * update_remote list of the thandle.  */
-       update = osp_update_request_create(dt_dev);
-       if (IS_ERR(update))
-               RETURN(PTR_ERR(update));
+       /* Calculate the maxium buffer length for each read request */
+       max_buf_size = OUT_UPDATE_REPLY_SIZE - cfs_size_round(sizeof(*orr)) -
+                      cfs_size_round(sizeof(struct object_update_result)) -
+                      cfs_size_round(offsetof(struct object_update_reply,
+                                     ourp_lens[1]));
+       while (total_length > 0) {
+               size_t  read_length;
+
+               /* Because it needs send the update buffer right away,
+                * just create an update buffer, instead of attaching the
+                * update_remote list of the thandle.  */
+               update = osp_update_request_create(dt_dev);
+               if (IS_ERR(update))
+                       GOTO(out, rc = PTR_ERR(update));
+
+               read_length = total_length > max_buf_size ?
+                             max_buf_size : total_length;
+
+               rc = osp_update_rpc_pack(env, read, update, OUT_READ,
+                                        lu_object_fid(&dt->do_lu),
+                                        read_length, offset);
+               if (rc != 0) {
+                       CERROR("%s: cannot insert update: rc = %d\n",
+                              dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
+                       GOTO(out, rc);
+               }
 
-       rc = osp_update_rpc_pack(env, read, update, OUT_READ,
-                                lu_object_fid(&dt->do_lu), rbuf->lb_len, *pos);
-       if (rc != 0) {
-               CERROR("%s: cannot insert update: rc = %d\n",
-                      dt_dev->dd_lu_dev.ld_obd->obd_name, rc);
-               GOTO(out, rc);
-       }
+               rc = osp_remote_sync(env, osp, update, &req);
+               if (rc < 0)
+                       GOTO(out, rc);
 
-       rc = osp_remote_sync(env, osp, update, &req);
-       if (rc < 0)
-               GOTO(out, rc);
+               reply = req_capsule_server_sized_get(&req->rq_pill,
+                                                    &RMF_OUT_UPDATE_REPLY,
+                                                    OUT_UPDATE_REPLY_SIZE);
 
-       reply = req_capsule_server_sized_get(&req->rq_pill,
-                                            &RMF_OUT_UPDATE_REPLY,
-                                            OUT_UPDATE_REPLY_SIZE);
-       if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
-               CERROR("%s: invalid update reply magic %x expected %x:"
-                      " rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name,
-                      reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO);
-               GOTO(out, rc = -EPROTO);
-       }
+               if (reply->ourp_magic != UPDATE_REPLY_MAGIC) {
+                       CERROR("%s: invalid update reply magic %x expected %x:"
+                              " rc = %d\n", dt_dev->dd_lu_dev.ld_obd->obd_name,
+                              reply->ourp_magic, UPDATE_REPLY_MAGIC, -EPROTO);
+                       GOTO(out, rc = -EPROTO);
+               }
 
-       rc = object_update_result_data_get(reply, lbuf, 0);
-       if (rc < 0)
-               GOTO(out, rc);
+               rc = object_update_result_data_get(reply, lbuf, 0);
+               if (rc < 0)
+                       GOTO(out, rc);
 
-       if (lbuf->lb_len < sizeof(*orr))
-               GOTO(out, rc = -EPROTO);
+               if (lbuf->lb_len < sizeof(*orr))
+                       GOTO(out, rc = -EPROTO);
 
-       orr = lbuf->lb_buf;
-       orr_le_to_cpu(orr, orr);
+               orr = lbuf->lb_buf;
+               orr_le_to_cpu(orr, orr);
+               offset = orr->orr_offset;
+               if (orr->orr_size > max_buf_size)
+                       GOTO(out, rc = -EPROTO);
 
-       *pos = orr->orr_offset;
+               memcpy(ptr, orr->orr_data, orr->orr_size);
+               ptr += orr->orr_size;
+               total_length -= orr->orr_size;
 
-       if (orr->orr_size > rbuf->lb_len)
-               GOTO(out, rc = -EPROTO);
+               CDEBUG(D_INFO, "%s: read "DFID" pos "LPU64" len %u left %zu\n",
+                      osp->opd_obd->obd_name, PFID(lu_object_fid(&dt->do_lu)),
+                      offset, orr->orr_size, total_length);
 
-       memcpy(rbuf->lb_buf, orr->orr_data, orr->orr_size);
+               if (orr->orr_size < read_length)
+                       break;
 
-       CDEBUG(D_INFO, "%s: read "DFID" pos "LPU64" len %u\n",
+               ptlrpc_req_finished(req);
+               osp_update_request_destroy(update);
+               req = NULL;
+               update = NULL;
+       }
+
+       total_length = rbuf->lb_len - total_length;
+       *pos = offset;
+       CDEBUG(D_INFO, "%s: total read "DFID" pos "LPU64" len %zu\n",
               osp->opd_obd->obd_name, PFID(lu_object_fid(&dt->do_lu)),
-              *pos, orr->orr_size);
-       GOTO(out, rc = (int)orr->orr_size);
+              *pos, total_length);
+       GOTO(out, rc = (int)total_length);
 out:
        if (req != NULL)
                ptlrpc_req_finished(req);
 
-       osp_update_request_destroy(update);
+       if (update != NULL)
+               osp_update_request_destroy(update);
 
        return rc;
 }
index f2e65f5..7052504 100644 (file)
@@ -631,9 +631,12 @@ static int out_read(struct tgt_session_info *tsi)
        }
        pos = le64_to_cpu(*(__u64 *)(tmp));
 
+       /* Check if the read buffer can hold the read_size */
        if (size > OUT_UPDATE_REPLY_SIZE -
-                  cfs_size_round((unsigned long)update_result->our_data -
-                                 (unsigned long)update_result) - sizeof(pos)) {
+                  cfs_size_round(offsetof(struct object_update_reply,
+                                          ourp_lens[1])) -
+                  cfs_size_round(sizeof(*update_result)) -
+                  cfs_size_round(sizeof(*orr))) {
                CERROR("%s: get %zu the biggest read size is %d: rc = %d\n",
                       tgt_name(tsi->tsi_tgt), size, OUT_UPDATE_REPLY_SIZE,
                       -EPROTO);
@@ -941,13 +944,11 @@ int out_handle(struct tgt_session_info *tsi)
        tti->tti_u.update.tti_update_reply = reply;
        tti->tti_mult_trans = !req_is_replay(tgt_ses_req(tsi));
 
-       /* Walk through updates in the request to execute them synchronously */
+       /* validate the request and calculate the total update count and
+        * set it to reply */
        for (i = 0; i < update_buf_count; i++) {
-               struct tgt_handler      *h;
-               struct dt_object        *dt_obj;
-               int                     update_count;
                struct object_update_request *our;
-               int                     j;
+               int                     update_count;
 
                our = update_bufs[i];
                if (ptlrpc_req_need_swab(pill->rc_req))
@@ -958,11 +959,22 @@ int out_handle(struct tgt_session_info *tsi)
                               " expect %x: rc = %d\n",
                               tgt_name(tsi->tsi_tgt), our->ourq_magic,
                               UPDATE_REQUEST_MAGIC, -EPROTO);
-                       GOTO(out, rc = -EPROTO);
+                       GOTO(out_free, rc = -EPROTO);
                }
+               update_count = our->ourq_count;
+               reply->ourp_count += update_count;
+       }
+       /* Walk through updates in the request to execute them */
+       for (i = 0; i < update_buf_count; i++) {
+               struct tgt_handler      *h;
+               struct dt_object        *dt_obj;
+               int                     update_count;
+               struct object_update_request *our;
+               int                     j;
 
+               our = update_bufs[i];
                update_count = our->ourq_count;
-               reply->ourp_count += update_count;
                for (j = 0; j < update_count; j++) {
                        update = object_update_request_get(our, j, NULL);
                        if (update == NULL)