#include <sys/sa_impl.h>
#include <sys/txg.h>
-static char osd_0copy_tag[] = "zerocopy";
+char osd_0copy_tag[] = "zerocopy";
static void dbuf_set_pending_evict(dmu_buf_t *db)
{
0, oh, NULL, OSD_QID_BLK));
}
+static dmu_buf_t *osd_get_dbuf(struct osd_object *obj, uint64_t offset)
+{
+ dmu_buf_t **dbs = obj->oo_dbs;
+ uint64_t blkid;
+ int i;
+
+ blkid = dbuf_whichblock(obj->oo_dn, 0, offset);
+ for (i = 0; i < OSD_MAX_DBUFS; i++) {
+ dmu_buf_impl_t *dbi = (void *)dbs[i];
+ if (!dbs[i])
+ continue;
+ if (dbi->db_blkid == blkid)
+ return dbs[i];
+ }
+ return (dmu_buf_t *)dbuf_hold(obj->oo_dn, blkid, osd_0copy_tag);
+}
+
+static void osd_put_dbuf(struct osd_object *obj, dmu_buf_t *db)
+{
+ dmu_buf_t **dbs = obj->oo_dbs;
+ int i;
+
+ for (i = 0; i < OSD_MAX_DBUFS; i++) {
+ if (dbs[i] == db)
+ return;
+ }
+ /* get rid of dbuf with blkd > 0 */
+ for (i = 0; i < OSD_MAX_DBUFS; i++) {
+ if (dbs[i] == NULL) {
+ dbs[i] = db;
+ return;
+ }
+ if (dbs[i]->db_offset > 0) {
+ /* replace this one */
+ dbuf_rele((dmu_buf_impl_t *)dbs[i], osd_0copy_tag);
+ dbs[i] = db;
+ return;
+ }
+ }
+ LBUG();
+}
+
+static ssize_t osd_write_llog_header(struct osd_object *obj,
+ const struct lu_buf *buf, loff_t *pos,
+ struct osd_thandle *oh)
+{
+ int bufoff, tocpy;
+ int len = buf->lb_len;
+ loff_t offset = *pos;
+ char *data = buf->lb_buf;
+
+ while (len > 0) {
+ dmu_buf_t *db = osd_get_dbuf(obj, offset);
+
+ bufoff = offset - db->db_offset;
+ tocpy = MIN(db->db_size - bufoff, len);
+ if (tocpy == db->db_size)
+ dmu_buf_will_fill(db, oh->ot_tx);
+ else
+ dmu_buf_will_dirty(db, oh->ot_tx);
+ LASSERT(offset >= db->db_offset);
+ LASSERT(offset + tocpy <= db->db_offset + db->db_size);
+ (void) memcpy((char *)db->db_data + bufoff, data, tocpy);
+
+ if (tocpy == db->db_size)
+ dmu_buf_fill_done(db, oh->ot_tx);
+
+ offset += tocpy;
+ data += tocpy;
+ len -= tocpy;
+
+ osd_put_dbuf(obj, db);
+ }
+
+ return 0;
+}
+
static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
const struct lu_buf *buf, loff_t *pos,
- struct thandle *th)
+ struct thandle *th)
{
struct osd_object *obj = osd_dt_obj(dt);
struct osd_device *osd = osd_obj2dev(obj);
if (obj->oo_destroyed)
GOTO(out, rc = -ENOENT);
- osd_dmu_write(osd, obj->oo_dn, offset, (uint64_t)buf->lb_len,
- buf->lb_buf, oh->ot_tx);
+ if (fid_is_llog(lu_object_fid(&dt->do_lu))) {
+ osd_write_llog_header(obj, buf, pos, oh);
+ } else {
+ osd_dmu_write(osd, obj->oo_dn, offset, (uint64_t)buf->lb_len,
+ buf->lb_buf, oh->ot_tx);
+ }
write_lock(&obj->oo_attr_lock);
if (obj->oo_attr.la_size < offset + buf->lb_len) {
obj->oo_attr.la_size = offset + buf->lb_len;
struct lu_object *l;
struct lu_object_header *h;
struct osd_device *o = osd_dev(d);
+ int i;
l = &mo->oo_dt.do_lu;
if (unlikely(o->od_in_init)) {
init_rwsem(&mo->oo_guard);
rwlock_init(&mo->oo_attr_lock);
mo->oo_destroy = OSD_DESTROY_NONE;
+ for (i = 0; i < OSD_MAX_DBUFS; i++)
+ mo->oo_dbs[i] = NULL;
return l;
} else {
return NULL;
{
struct osd_object *obj = osd_obj(l);
const struct lu_fid *fid = lu_object_fid(l);
+ dmu_buf_t **dbs = obj->oo_dbs;
+ int i;
+
+ for (i = 0; i < OSD_MAX_DBUFS; i++) {
+ if (dbs[i])
+ dbuf_rele((dmu_buf_impl_t *)dbs[i], osd_0copy_tag);
+ }
if (obj->oo_dn) {
if (likely(!fid_is_acct(fid))) {