#define DEBUG_SUBSYSTEM S_OSD
-#include <lustre_ver.h>
#include <libcfs/libcfs.h>
#include <obd_support.h>
#include <lustre_net.h>
#include <obd_class.h>
#include <lustre_disk.h>
#include <lustre_fid.h>
-#include <lustre/lustre_idl.h> /* LLOG_MIN_CHUNK_SIZE definition */
+#include <lustre_quota.h>
#include "osd_internal.h"
* as llog or last_rcvd files. We needn't enforce quota on those
* objects, so always set the lqi_space as 0. */
RETURN(osd_declare_quota(env, osd, obj->oo_attr.la_uid,
- obj->oo_attr.la_gid, 0, oh, true, NULL,
- false));
+ obj->oo_attr.la_gid, obj->oo_attr.la_projid,
+ 0, oh, NULL, OSD_QID_BLK));
}
static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
dmu_buf_rele((void *)ptr, osd_0copy_tag);
atomic_dec(&osd->od_zerocopy_pin);
} else if (lnb[i].lnb_data != NULL) {
+ int j, apages, abufsz;
+ abufsz = arc_buf_size(lnb[i].lnb_data);
+ apages = abufsz / PAGE_SIZE;
+ /* these references to pages must be invalidated
+ * to prevent access in osd_bufs_put() */
+ for (j = 0; j < apages; j++)
+ lnb[i + j].lnb_page = NULL;
dmu_return_arcbuf(lnb[i].lnb_data);
atomic_dec(&osd->od_zerocopy_loan);
}
static inline struct page *kmem_to_page(void *addr)
{
+ LASSERT(!((unsigned long)addr & ~PAGE_MASK));
if (is_vmalloc_addr(addr))
return vmalloc_to_page(addr);
else
* \retval negative error number of failure
*/
static int osd_bufs_get_read(const struct lu_env *env, struct osd_object *obj,
- loff_t off, ssize_t len, struct niobuf_local *lnb)
+ loff_t off, ssize_t len, struct niobuf_local *lnb)
{
struct osd_device *osd = osd_obj2dev(obj);
unsigned long start = cfs_time_current();
RETURN(rc);
}
+static inline arc_buf_t *osd_request_arcbuf(dnode_t *dn, size_t bs)
+{
+ arc_buf_t *abuf;
+
+ abuf = dmu_request_arcbuf(&dn->dn_bonus->db, bs);
+ if (unlikely(!abuf))
+ return ERR_PTR(-ENOMEM);
+
+#if ZFS_VERSION_CODE < OBD_OCD_VERSION(0, 7, 0, 0)
+ /**
+ * ZFS prior to 0.7.0 doesn't guarantee PAGE_SIZE alignment for zio
+ * blocks smaller than (PAGE_SIZE << 2). This poses a problem of
+ * setting up page array for RDMA transfer. See LU-9305.
+ */
+ if ((unsigned long)abuf->b_data & ~PAGE_MASK) {
+ dmu_return_arcbuf(abuf);
+ return NULL;
+ }
+#endif
+
+ return abuf;
+}
+
static int osd_bufs_get_write(const struct lu_env *env, struct osd_object *obj,
- loff_t off, ssize_t len, struct niobuf_local *lnb)
+ loff_t off, ssize_t len, struct niobuf_local *lnb)
{
struct osd_device *osd = osd_obj2dev(obj);
int plen, off_in_block, sz_in_block;
off_in_block = off & (bs - 1);
sz_in_block = min_t(int, bs - off_in_block, len);
+ abuf = NULL;
if (sz_in_block == bs) {
/* full block, try to use zerocopy */
+ abuf = osd_request_arcbuf(dn, bs);
+ if (unlikely(IS_ERR(abuf)))
+ GOTO(out_err, rc = PTR_ERR(abuf));
+ }
- abuf = dmu_request_arcbuf(&dn->dn_bonus->db, bs);
- if (unlikely(abuf == NULL))
- GOTO(out_err, rc = -ENOMEM);
-
+ if (abuf != NULL) {
atomic_inc(&osd->od_zerocopy_loan);
/* go over pages arcbuf contains, put them as
static int osd_bufs_get(const struct lu_env *env, struct dt_object *dt,
loff_t offset, ssize_t len, struct niobuf_local *lnb,
- int rw)
+ enum dt_bufs_type rw)
{
struct osd_object *obj = osd_dt_obj(dt);
int rc;
LASSERT(dt_object_exists(dt));
LASSERT(obj->oo_dn);
- if (rw == 0)
- rc = osd_bufs_get_read(env, obj, offset, len, lnb);
- else
+ if (rw & DT_BUFS_TYPE_WRITE)
rc = osd_bufs_get_write(env, obj, offset, len, lnb);
+ else
+ rc = osd_bufs_get_read(env, obj, offset, len, lnb);
return rc;
}
uint32_t size = 0;
uint32_t blksz = obj->oo_dn->dn_datablksz;
int i, rc, flags = 0;
- bool ignore_quota = false, synced = false;
+ bool synced = false;
long long space = 0;
struct page *last_page = NULL;
unsigned long discont_pages = 0;
+ enum osd_qid_declare_flags declare_flags = OSD_QID_BLK;
ENTRY;
LASSERT(dt_object_exists(dt));
if ((lnb[i].lnb_flags & OBD_BRW_NOQUOTA) ||
(lnb[i].lnb_flags & (OBD_BRW_FROM_GRANT | OBD_BRW_SYNC)) ==
OBD_BRW_FROM_GRANT)
- ignore_quota = true;
+ declare_flags |= OSD_QID_FORCE;
+
if (size == 0) {
/* first valid lnb */
offset = lnb[i].lnb_file_offset;
retry:
/* acquire quota space if needed */
rc = osd_declare_quota(env, osd, obj->oo_attr.la_uid,
- obj->oo_attr.la_gid, space, oh, true, &flags,
- ignore_quota);
+ obj->oo_attr.la_gid, obj->oo_attr.la_projid,
+ space, oh, &flags, declare_flags);
if (!synced && rc == -EDQUOT && (flags & QUOTA_FL_SYNC) != 0) {
dt_sync(env, th->th_dev);
lnb[0].lnb_flags |= OBD_BRW_OVER_USRQUOTA;
if (flags & QUOTA_FL_OVER_GRPQUOTA)
lnb[0].lnb_flags |= OBD_BRW_OVER_GRPQUOTA;
+#ifdef ZFS_PROJINHERIT
+ if (flags & QUOTA_FL_OVER_PRJQUOTA)
+ lnb[0].lnb_flags |= OBD_BRW_OVER_PRJQUOTA;
+#endif
RETURN(rc);
}
continue;
}
+ if (new_size < lnb[i].lnb_file_offset + lnb[i].lnb_len)
+ new_size = lnb[i].lnb_file_offset + lnb[i].lnb_len;
+ if (lnb[i].lnb_page == NULL)
+ continue;
+
if (lnb[i].lnb_page->mapping == (void *)obj) {
osd_dmu_write(osd, obj->oo_dn, lnb[i].lnb_file_offset,
lnb[i].lnb_len, kmap(lnb[i].lnb_page),
oh->ot_tx);
kunmap(lnb[i].lnb_page);
+ iosize += lnb[i].lnb_len;
} else if (lnb[i].lnb_data) {
+ int j, apages, abufsz;
LASSERT(((unsigned long)lnb[i].lnb_data & 1) == 0);
/* buffer loaned for zerocopy, try to use it.
* notice that dmu_assign_arcbuf() is smart
* enough to recognize changed blocksize
* in this case it fallbacks to dmu_write() */
+ abufsz = arc_buf_size(lnb[i].lnb_data);
+ LASSERT(abufsz & PAGE_MASK);
+ apages = abufsz / PAGE_SIZE;
+ LASSERT(i + apages <= npages);
+ /* these references to pages must be invalidated
+ * to prevent access in osd_bufs_put() */
+ for (j = 0; j < apages; j++)
+ lnb[i + j].lnb_page = NULL;
dmu_assign_arcbuf(&obj->oo_dn->dn_bonus->db,
lnb[i].lnb_file_offset,
lnb[i].lnb_data, oh->ot_tx);
* will be releasing it - bad! */
lnb[i].lnb_data = NULL;
atomic_dec(&osd->od_zerocopy_loan);
+ iosize += abufsz;
}
- if (new_size < lnb[i].lnb_file_offset + lnb[i].lnb_len)
- new_size = lnb[i].lnb_file_offset + lnb[i].lnb_len;
- iosize += lnb[i].lnb_len;
}
up_read(&obj->oo_guard);
}
RETURN(osd_declare_quota(env, osd, obj->oo_attr.la_uid,
- obj->oo_attr.la_gid, 0, oh, true, NULL,
- false));
+ obj->oo_attr.la_gid, obj->oo_attr.la_projid,
+ 0, oh, NULL, OSD_QID_BLK));
}
static int osd_ladvise(const struct lu_env *env, struct dt_object *dt,