Whamcloud - gitweb
LU-13132 osd: osd-zfs to cache dbufs for llog objects 22/37222/55
authorAlex Zhuravlev <bzzz@whamcloud.com>
Wed, 18 Aug 2021 08:09:49 +0000 (11:09 +0300)
committerOleg Drokin <green@whamcloud.com>
Tue, 18 Apr 2023 03:21:38 +0000 (03:21 +0000)
working set for llog objects is tiny and very predictable. osd-zfs
can cache couple dbufs (first block storing the header and last
block for new records).

for sanity/60a (llog test) it gives 5939307 hits and 5776 misses
while average osd_write() goes down from 1.09 usec to 0.27 usec,
total time for sanity/60a: before - 153s, after - 101s.

this approach can be used in few other cases like last_rcvd.

Change-Id: Icc0126658894085d33ef79ae41ac6c1ed4140f4c
Signed-off-by: Alex Zhuravlev <bzzz@whamcloud.com>
Reviewed-on: https://review.whamcloud.com/c/fs/lustre-release/+/37222
Tested-by: jenkins <devops@whamcloud.com>
Tested-by: Maloo <maloo@whamcloud.com>
Reviewed-by: Oleg Drokin <green@whamcloud.com>
Reviewed-by: Brian Behlendorf <behlendorf1@llnl.gov>
Reviewed-by: Andreas Dilger <adilger@whamcloud.com>
lustre/osd-zfs/osd_internal.h
lustre/osd-zfs/osd_io.c
lustre/osd-zfs/osd_object.c

index 176ef7b..4e20832 100644 (file)
@@ -422,6 +422,8 @@ enum osd_destroy_type {
        OSD_DESTROY_ASYNC = 2,
 };
 
+#define OSD_MAX_DBUFS  2       /* how many dbufs to cache in object */
+
 struct osd_object {
        struct dt_object         oo_dt;
        /*
@@ -475,6 +477,7 @@ struct osd_object {
                uint64_t        oo_parent; /* used only at object creation */
        };
        struct lu_object_header *oo_header;
+       dmu_buf_t *oo_dbs[OSD_MAX_DBUFS];
 };
 
 int osd_statfs(const struct lu_env *, struct dt_device *, struct obd_statfs *,
@@ -1175,4 +1178,6 @@ osd_index_backup(const struct lu_env *env, struct osd_device *osd, bool backup)
 #define osd_dmu_offset_next(os, obj, hole, res) (EOPNOTSUPP)
 #endif
 
+extern char osd_0copy_tag[];
+
 #endif /* _OSD_INTERNAL_H */
index 5e7708c..34441b0 100644 (file)
@@ -60,7 +60,7 @@
 #include <sys/sa_impl.h>
 #include <sys/txg.h>
 
-static char osd_0copy_tag[] = "zerocopy";
+char osd_0copy_tag[] = "zerocopy";
 
 static void dbuf_set_pending_evict(dmu_buf_t *db)
 {
@@ -239,9 +239,86 @@ static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
                                 0, oh, NULL, OSD_QID_BLK));
 }
 
+static dmu_buf_t *osd_get_dbuf(struct osd_object *obj, uint64_t offset)
+{
+       dmu_buf_t **dbs = obj->oo_dbs;
+       uint64_t blkid;
+       int i;
+
+       blkid = dbuf_whichblock(obj->oo_dn, 0, offset);
+       for (i = 0; i < OSD_MAX_DBUFS; i++) {
+               dmu_buf_impl_t *dbi = (void *)dbs[i];
+               if (!dbs[i])
+                       continue;
+               if (dbi->db_blkid == blkid)
+                       return dbs[i];
+       }
+       return (dmu_buf_t *)dbuf_hold(obj->oo_dn, blkid, osd_0copy_tag);
+}
+
+static void osd_put_dbuf(struct osd_object *obj, dmu_buf_t *db)
+{
+       dmu_buf_t **dbs = obj->oo_dbs;
+       int i;
+
+       for (i = 0; i < OSD_MAX_DBUFS; i++) {
+               if (dbs[i] == db)
+                       return;
+       }
+       /* get rid of dbuf with blkd > 0 */
+       for (i = 0; i < OSD_MAX_DBUFS; i++) {
+               if (dbs[i] == NULL) {
+                       dbs[i] = db;
+                       return;
+               }
+               if (dbs[i]->db_offset > 0) {
+                       /* replace this one */
+                       dbuf_rele((dmu_buf_impl_t *)dbs[i], osd_0copy_tag);
+                       dbs[i] = db;
+                       return;
+               }
+       }
+       LBUG();
+}
+
+static ssize_t osd_write_llog_header(struct osd_object *obj,
+                                    const struct lu_buf *buf, loff_t *pos,
+                                    struct osd_thandle *oh)
+{
+       int bufoff, tocpy;
+       int len = buf->lb_len;
+       loff_t offset = *pos;
+       char *data = buf->lb_buf;
+
+       while (len > 0) {
+               dmu_buf_t *db = osd_get_dbuf(obj, offset);
+
+               bufoff = offset - db->db_offset;
+               tocpy = MIN(db->db_size - bufoff, len);
+               if (tocpy == db->db_size)
+                       dmu_buf_will_fill(db, oh->ot_tx);
+               else
+                       dmu_buf_will_dirty(db, oh->ot_tx);
+               LASSERT(offset >= db->db_offset);
+               LASSERT(offset + tocpy <= db->db_offset + db->db_size);
+               (void) memcpy((char *)db->db_data + bufoff, data, tocpy);
+
+               if (tocpy == db->db_size)
+                       dmu_buf_fill_done(db, oh->ot_tx);
+
+               offset += tocpy;
+               data += tocpy;
+               len -= tocpy;
+
+               osd_put_dbuf(obj, db);
+       }
+
+       return 0;
+}
+
 static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
                        const struct lu_buf *buf, loff_t *pos,
-                        struct thandle *th)
+                       struct thandle *th)
 {
        struct osd_object  *obj  = osd_dt_obj(dt);
        struct osd_device  *osd = osd_obj2dev(obj);
@@ -261,8 +338,12 @@ static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
        if (obj->oo_destroyed)
                GOTO(out, rc = -ENOENT);
 
-       osd_dmu_write(osd, obj->oo_dn, offset, (uint64_t)buf->lb_len,
-                     buf->lb_buf, oh->ot_tx);
+       if (fid_is_llog(lu_object_fid(&dt->do_lu))) {
+               osd_write_llog_header(obj, buf, pos, oh);
+       } else {
+               osd_dmu_write(osd, obj->oo_dn, offset, (uint64_t)buf->lb_len,
+                             buf->lb_buf, oh->ot_tx);
+       }
        write_lock(&obj->oo_attr_lock);
        if (obj->oo_attr.la_size < offset + buf->lb_len) {
                obj->oo_attr.la_size = offset + buf->lb_len;
index 9332883..0b2753a 100644 (file)
@@ -310,6 +310,7 @@ struct lu_object *osd_object_alloc(const struct lu_env *env,
                struct lu_object *l;
                struct lu_object_header *h;
                struct osd_device *o = osd_dev(d);
+               int i;
 
                l = &mo->oo_dt.do_lu;
                if (unlikely(o->od_in_init)) {
@@ -336,6 +337,8 @@ struct lu_object *osd_object_alloc(const struct lu_env *env,
                init_rwsem(&mo->oo_guard);
                rwlock_init(&mo->oo_attr_lock);
                mo->oo_destroy = OSD_DESTROY_NONE;
+               for (i = 0; i < OSD_MAX_DBUFS; i++)
+                       mo->oo_dbs[i] = NULL;
                return l;
        } else {
                return NULL;
@@ -902,6 +905,13 @@ static void osd_object_delete(const struct lu_env *env, struct lu_object *l)
 {
        struct osd_object *obj = osd_obj(l);
        const struct lu_fid *fid = lu_object_fid(l);
+       dmu_buf_t **dbs = obj->oo_dbs;
+       int i;
+
+       for (i = 0; i < OSD_MAX_DBUFS; i++) {
+               if (dbs[i])
+                       dbuf_rele((dmu_buf_impl_t *)dbs[i], osd_0copy_tag);
+       }
 
        if (obj->oo_dn) {
                if (likely(!fid_is_acct(fid))) {