* Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
* Use is subject to license terms.
*
- * Copyright (c) 2012, 2016, Intel Corporation.
+ * Copyright (c) 2012, 2017, Intel Corporation.
*/
/*
* This file is part of Lustre, http://www.lustre.org/
#define DEBUG_SUBSYSTEM S_OSD
-#include <lustre_ver.h>
#include <libcfs/libcfs.h>
#include <obd_support.h>
#include <lustre_net.h>
#include <obd_class.h>
#include <lustre_disk.h>
#include <lustre_fid.h>
-#include <lustre/lustre_idl.h> /* LLOG_MIN_CHUNK_SIZE definition */
+#include <lustre_quota.h>
#include "osd_internal.h"
static char *osd_0copy_tag = "zerocopy";
+static void dbuf_set_pending_evict(dmu_buf_t *db)
+{
+ dmu_buf_impl_t *dbi = (dmu_buf_impl_t *)db;
+ dbi->db_pending_evict = TRUE;
+}
+
static void record_start_io(struct osd_device *osd, int rw, int discont_pages)
{
struct obd_histogram *h = osd->od_brw_stats.hist;
{
struct obd_histogram *h = osd->od_brw_stats.hist;
- if (rw == READ) {
+ if (rw == READ)
atomic_dec(&osd->od_r_in_flight);
- lprocfs_oh_tally_log2(&h[BRW_R_PAGES], npages);
- if (disksize > 0)
- lprocfs_oh_tally_log2(&h[BRW_R_DISK_IOSIZE], disksize);
- if (elapsed)
- lprocfs_oh_tally_log2(&h[BRW_R_IO_TIME], elapsed);
-
- } else {
+ else
atomic_dec(&osd->od_w_in_flight);
- lprocfs_oh_tally_log2(&h[BRW_W_PAGES], npages);
- if (disksize > 0)
- lprocfs_oh_tally_log2(&h[BRW_W_DISK_IOSIZE], disksize);
- if (elapsed)
- lprocfs_oh_tally_log2(&h[BRW_W_IO_TIME], elapsed);
- }
+
+ lprocfs_oh_tally_log2(&h[BRW_R_PAGES + rw], npages);
+ if (disksize > 0)
+ lprocfs_oh_tally_log2(&h[BRW_R_DISK_IOSIZE + rw], disksize);
+ if (elapsed)
+ lprocfs_oh_tally_log2(&h[BRW_R_IO_TIME + rw], elapsed);
}
-static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
- struct lu_buf *buf, loff_t *pos)
+static ssize_t __osd_read(const struct lu_env *env, struct dt_object *dt,
+ struct lu_buf *buf, loff_t *pos, size_t *size)
{
- struct osd_object *obj = osd_dt_obj(dt);
- struct osd_device *osd = osd_obj2dev(obj);
- uint64_t old_size;
- int size = buf->lb_len;
- int rc;
- unsigned long start;
+ struct osd_object *obj = osd_dt_obj(dt);
+ uint64_t old_size;
+ int rc;
LASSERT(dt_object_exists(dt));
LASSERT(obj->oo_dn);
- start = cfs_time_current();
-
read_lock(&obj->oo_attr_lock);
old_size = obj->oo_attr.la_size;
read_unlock(&obj->oo_attr_lock);
- if (*pos + size > old_size) {
+ if (*pos + *size > old_size) {
if (old_size < *pos)
return 0;
- else
- size = old_size - *pos;
+
+ *size = old_size - *pos;
}
- record_start_io(osd, READ, 0);
+ rc = osd_dmu_read(osd_obj2dev(obj), obj->oo_dn, *pos, *size,
+ buf->lb_buf, DMU_READ_PREFETCH);
+ if (!rc) {
+ rc = *size;
+ *pos += *size;
+ }
- rc = osd_dmu_read(osd, obj->oo_dn, *pos, size, buf->lb_buf,
- DMU_READ_PREFETCH);
+ return rc;
+}
+
+static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
+ struct lu_buf *buf, loff_t *pos)
+{
+ struct osd_device *osd = osd_obj2dev(osd_dt_obj(dt));
+ size_t size = buf->lb_len;
+ ktime_t start;
+ s64 delta_ms;
+ int rc;
+
+ start = ktime_get();
+ record_start_io(osd, READ, 0);
+ rc = __osd_read(env, dt, buf, pos, &size);
+ delta_ms = ktime_ms_delta(ktime_get(), start);
+ record_end_io(osd, READ, delta_ms, size, size >> PAGE_SHIFT);
- record_end_io(osd, READ, cfs_time_current() - start, size,
- size >> PAGE_SHIFT);
- if (rc == 0) {
- rc = size;
- *pos += size;
- }
return rc;
}
+static inline ssize_t osd_read_no_record(const struct lu_env *env,
+ struct dt_object *dt,
+ struct lu_buf *buf, loff_t *pos)
+{
+ size_t size = buf->lb_len;
+
+ return __osd_read(env, dt, buf, pos, &size);
+}
+
static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
const struct lu_buf *buf, loff_t pos,
struct thandle *th)
* as llog or last_rcvd files. We needn't enforce quota on those
* objects, so always set the lqi_space as 0. */
RETURN(osd_declare_quota(env, osd, obj->oo_attr.la_uid,
- obj->oo_attr.la_gid, 0, oh, true, NULL,
- false));
+ obj->oo_attr.la_gid, obj->oo_attr.la_projid,
+ 0, oh, NULL, OSD_QID_BLK));
}
static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
const struct lu_buf *buf, loff_t *pos,
- struct thandle *th, int ignore_quota)
+ struct thandle *th)
{
struct osd_object *obj = osd_dt_obj(dt);
struct osd_device *osd = osd_obj2dev(obj);
dmu_buf_rele((void *)ptr, osd_0copy_tag);
atomic_dec(&osd->od_zerocopy_pin);
} else if (lnb[i].lnb_data != NULL) {
+ int j, apages, abufsz;
+ abufsz = arc_buf_size(lnb[i].lnb_data);
+ apages = abufsz / PAGE_SIZE;
+ /* these references to pages must be invalidated
+ * to prevent access in osd_bufs_put() */
+ for (j = 0; j < apages; j++)
+ lnb[i + j].lnb_page = NULL;
dmu_return_arcbuf(lnb[i].lnb_data);
atomic_dec(&osd->od_zerocopy_loan);
}
* \retval negative error number of failure
*/
static int osd_bufs_get_read(const struct lu_env *env, struct osd_object *obj,
- loff_t off, ssize_t len, struct niobuf_local *lnb)
+ loff_t off, ssize_t len, struct niobuf_local *lnb)
{
struct osd_device *osd = osd_obj2dev(obj);
- unsigned long start = cfs_time_current();
- int rc, i, numbufs, npages = 0;
- dmu_buf_t **dbp;
- ENTRY;
+ int rc, i, numbufs, npages = 0, drop_cache = 0;
+ ktime_t start = ktime_get();
+ dmu_buf_t **dbp;
+ s64 delta_ms;
+ ENTRY;
record_start_io(osd, READ, 0);
+ if (obj->oo_attr.la_size >= osd->od_readcache_max_filesize)
+ drop_cache = 1;
+
/* grab buffers for read:
* OSD API let us to grab buffers first, then initiate IO(s)
* so that all required IOs will be done in parallel, but at the
* If we discover this is a vital for good performance we
* can get own replacement for dmu_buf_hold_array_by_bonus().
*/
- while (len > 0) {
+ while (len > 0 &&
+ (obj->oo_dn->dn_datablkshift != 0 ||
+ off < obj->oo_dn->dn_datablksz)) {
+ if (obj->oo_dn->dn_datablkshift == 0 &&
+ off + len > obj->oo_dn->dn_datablksz)
+ len = obj->oo_dn->dn_datablksz - off;
+
rc = -dmu_buf_hold_array_by_bonus(&obj->oo_dn->dn_bonus->db,
off, len, TRUE, osd_0copy_tag,
&numbufs, &dbp);
lnb++;
}
+ if (drop_cache)
+ dbuf_set_pending_evict(dbp[i]);
+
/* steal dbuf so dmu_buf_rele_array() can't release
* it */
dbp[i] = NULL;
dmu_buf_rele_array(dbp, numbufs, osd_0copy_tag);
}
- record_end_io(osd, READ, cfs_time_current() - start,
- npages * PAGE_SIZE, npages);
+ delta_ms = ktime_ms_delta(ktime_get(), start);
+ record_end_io(osd, READ, delta_ms, npages * PAGE_SIZE, npages);
RETURN(npages);
}
static int osd_bufs_get_write(const struct lu_env *env, struct osd_object *obj,
- loff_t off, ssize_t len, struct niobuf_local *lnb)
+ loff_t off, ssize_t len, struct niobuf_local *lnb)
{
struct osd_device *osd = osd_obj2dev(obj);
- int plen, off_in_block, sz_in_block;
+ int poff, plen, off_in_block, sz_in_block;
int rc, i = 0, npages = 0;
dnode_t *dn = obj->oo_dn;
arc_buf_t *abuf;
LPROC_OSD_TAIL_IO, 1);
/* can't use zerocopy, allocate temp. buffers */
+ poff = off & (PAGE_SIZE - 1);
while (sz_in_block > 0) {
- plen = min_t(int, sz_in_block, PAGE_SIZE);
+ plen = min_t(int, poff + sz_in_block,
+ PAGE_SIZE);
+ plen -= poff;
lnb[i].lnb_file_offset = off;
- lnb[i].lnb_page_offset = 0;
+ lnb[i].lnb_page_offset = poff;
+ poff = 0;
+
lnb[i].lnb_len = plen;
lnb[i].lnb_rc = 0;
lnb[i].lnb_data = NULL;
static int osd_bufs_get(const struct lu_env *env, struct dt_object *dt,
loff_t offset, ssize_t len, struct niobuf_local *lnb,
- int rw)
+ enum dt_bufs_type rw)
{
struct osd_object *obj = osd_dt_obj(dt);
int rc;
LASSERT(dt_object_exists(dt));
LASSERT(obj->oo_dn);
- if (rw == 0)
- rc = osd_bufs_get_read(env, obj, offset, len, lnb);
- else
+ if (rw & DT_BUFS_TYPE_WRITE)
rc = osd_bufs_get_write(env, obj, offset, len, lnb);
+ else
+ rc = osd_bufs_get_read(env, obj, offset, len, lnb);
return rc;
}
uint32_t size = 0;
uint32_t blksz = obj->oo_dn->dn_datablksz;
int i, rc, flags = 0;
- bool ignore_quota = false, synced = false;
+ bool synced = false;
long long space = 0;
struct page *last_page = NULL;
unsigned long discont_pages = 0;
+ enum osd_qid_declare_flags declare_flags = OSD_QID_BLK;
ENTRY;
LASSERT(dt_object_exists(dt));
if ((lnb[i].lnb_flags & OBD_BRW_NOQUOTA) ||
(lnb[i].lnb_flags & (OBD_BRW_FROM_GRANT | OBD_BRW_SYNC)) ==
OBD_BRW_FROM_GRANT)
- ignore_quota = true;
+ declare_flags |= OSD_QID_FORCE;
+
if (size == 0) {
/* first valid lnb */
offset = lnb[i].lnb_file_offset;
space += osd_roundup2blocksz(size, offset, blksz);
}
- oh->ot_write_commit = 1; /* used in osd_trans_start() for fail_loc */
-
/* backend zfs filesystem might be configured to store multiple data
* copies */
space *= osd->od_os->os_copies;
retry:
/* acquire quota space if needed */
rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
- obj->oo_attr.la_gid, space, oh, true, &flags,
- ignore_quota);
+ obj->oo_attr.la_gid, obj->oo_attr.la_projid,
+ space, oh, &flags, declare_flags);
if (!synced && rc == -EDQUOT && (flags & QUOTA_FL_SYNC) != 0) {
dt_sync(env, th->th_dev);
lnb[0].lnb_flags |= OBD_BRW_OVER_USRQUOTA;
if (flags & QUOTA_FL_OVER_GRPQUOTA)
lnb[0].lnb_flags |= OBD_BRW_OVER_GRPQUOTA;
+#ifdef ZFS_PROJINHERIT
+ if (flags & QUOTA_FL_OVER_PRJQUOTA)
+ lnb[0].lnb_flags |= OBD_BRW_OVER_PRJQUOTA;
+#endif
RETURN(rc);
}
return rc;
}
+static void osd_evict_dbufs_after_write(struct osd_object *obj,
+ loff_t off, ssize_t len)
+{
+ dmu_buf_t **dbp;
+ int i, rc, numbufs;
+
+ rc = -dmu_buf_hold_array_by_bonus(&obj->oo_dn->dn_bonus->db, off, len,
+ TRUE, osd_0copy_tag, &numbufs, &dbp);
+ if (unlikely(rc))
+ return;
+
+ for (i = 0; i < numbufs; i++)
+ dbuf_set_pending_evict(dbp[i]);
+
+ dmu_buf_rele_array(dbp, numbufs, osd_0copy_tag);
+}
+
static int osd_write_commit(const struct lu_env *env, struct dt_object *dt,
struct niobuf_local *lnb, int npages,
struct thandle *th)
struct osd_device *osd = osd_obj2dev(obj);
struct osd_thandle *oh;
uint64_t new_size = 0;
- int i, rc = 0;
+ int i, abufsz, rc = 0, drop_cache = 0;
unsigned long iosize = 0;
ENTRY;
lnb[npages - 1].lnb_file_offset +
lnb[npages - 1].lnb_len);
+ if (obj->oo_attr.la_size >= osd->od_readcache_max_filesize ||
+ lnb[npages - 1].lnb_file_offset + lnb[npages - 1].lnb_len >=
+ osd->od_readcache_max_filesize)
+ drop_cache = 1;
+
+ if (OBD_FAIL_CHECK(OBD_FAIL_OST_MAPBLK_ENOSPC))
+ RETURN(-ENOSPC);
+
/* LU-8791: take oo_guard to avoid the deadlock that changing block
* size and assigning arcbuf take place at the same time.
*
continue;
}
+ if (new_size < lnb[i].lnb_file_offset + lnb[i].lnb_len)
+ new_size = lnb[i].lnb_file_offset + lnb[i].lnb_len;
+ if (lnb[i].lnb_page == NULL)
+ continue;
+
if (lnb[i].lnb_page->mapping == (void *)obj) {
osd_dmu_write(osd, obj->oo_dn, lnb[i].lnb_file_offset,
- lnb[i].lnb_len, kmap(lnb[i].lnb_page),
- oh->ot_tx);
+ lnb[i].lnb_len, kmap(lnb[i].lnb_page) +
+ lnb[i].lnb_page_offset, oh->ot_tx);
kunmap(lnb[i].lnb_page);
+ iosize += lnb[i].lnb_len;
+ abufsz = lnb[i].lnb_len; /* to drop cache below */
} else if (lnb[i].lnb_data) {
+ int j, apages;
LASSERT(((unsigned long)lnb[i].lnb_data & 1) == 0);
/* buffer loaned for zerocopy, try to use it.
* notice that dmu_assign_arcbuf() is smart
* enough to recognize changed blocksize
* in this case it fallbacks to dmu_write() */
+ abufsz = arc_buf_size(lnb[i].lnb_data);
+ LASSERT(abufsz & PAGE_MASK);
+ apages = abufsz / PAGE_SIZE;
+ LASSERT(i + apages <= npages);
+ /* these references to pages must be invalidated
+ * to prevent access in osd_bufs_put() */
+ for (j = 0; j < apages; j++)
+ lnb[i + j].lnb_page = NULL;
dmu_assign_arcbuf(&obj->oo_dn->dn_bonus->db,
lnb[i].lnb_file_offset,
lnb[i].lnb_data, oh->ot_tx);
* will be releasing it - bad! */
lnb[i].lnb_data = NULL;
atomic_dec(&osd->od_zerocopy_loan);
+ iosize += abufsz;
+ } else {
+ /* we don't want to deal with cache if nothing
+ * has been send to ZFS at this step */
+ continue;
}
- if (new_size < lnb[i].lnb_file_offset + lnb[i].lnb_len)
- new_size = lnb[i].lnb_file_offset + lnb[i].lnb_len;
- iosize += lnb[i].lnb_len;
+ if (!drop_cache)
+ continue;
+
+ /* we have to mark dbufs for eviction here because
+ * dmu_assign_arcbuf() may create a new dbuf for
+ * loaned abuf */
+ osd_evict_dbufs_after_write(obj, lnb[i].lnb_file_offset,
+ abufsz);
}
up_read(&obj->oo_guard);
/* declare we'll free some blocks ... */
if (start < obj->oo_attr.la_size) {
read_unlock(&obj->oo_attr_lock);
+ dmu_tx_mark_netfree(oh->ot_tx);
dmu_tx_hold_free(oh->ot_tx, obj->oo_dn->dn_object, start, len);
} else {
read_unlock(&obj->oo_attr_lock);
}
RETURN(osd_declare_quota(env, osd, obj->oo_attr.la_uid,
- obj->oo_attr.la_gid, 0, oh, true, NULL,
- false));
+ obj->oo_attr.la_gid, obj->oo_attr.la_projid,
+ 0, oh, NULL, OSD_QID_BLK));
}
static int osd_ladvise(const struct lu_env *env, struct dt_object *dt,
.dbo_punch = osd_punch,
.dbo_ladvise = osd_ladvise,
};
+
+struct dt_body_operations osd_body_scrub_ops = {
+ .dbo_read = osd_read_no_record,
+ .dbo_declare_write = osd_declare_write,
+ .dbo_write = osd_write,
+};