static const struct lu_object_operations osd_lu_obj_ops;
static const struct dt_object_operations osd_obj_ops;
static const struct dt_object_operations osd_obj_ea_ops;
-static const struct dt_body_operations osd_body_ops_new;
static const struct dt_index_operations osd_index_iam_ops;
static const struct dt_index_operations osd_index_ea_ops;
oh->ot_id_type |= ((type == GRPQUOTA) ? (1 << i) : 0);
}
-static void osd_declare_qid(struct dt_object *dt, struct osd_thandle *oh,
- int type, uid_t id, struct inode *inode)
+void osd_declare_qid(struct dt_object *dt, struct osd_thandle *oh,
+ int type, uid_t id, struct inode *inode)
{
#ifdef CONFIG_QUOTA
int i, allocated = 0;
struct dt_device *d)
{
struct osd_thread_info *oti = osd_oti_get(env);
+ struct osd_iobuf *iobuf = &oti->oti_iobuf;
struct osd_thandle *oh;
struct thandle *th;
ENTRY;
+ /* on pending IO in this thread should left from prev. request */
+ LASSERT(cfs_atomic_read(&iobuf->dr_numreqs) == 0);
+
th = ERR_PTR(-ENOMEM);
OBD_ALLOC_GFP(oh, sizeof *oh, CFS_ALLOC_IO);
if (oh != NULL) {
int rc = 0;
struct osd_thandle *oh;
struct osd_thread_info *oti = osd_oti_get(env);
+ struct osd_iobuf *iobuf = &oti->oti_iobuf;
ENTRY;
OBD_FREE_PTR(oh);
}
+ /* as we want IO to journal and data IO be concurrent, we don't block
+ * awaiting data IO completion in osd_do_bio(), instead we wait here
+ * once transaction is submitted to the journal. all reqular requests
+ * don't do direct IO (except read/write), thus this wait_event becomes
+ * no-op for them.
+ *
+ * IMPORTANT: we have to wait till any IO submited by the thread is
+ * completed otherwise iobuf may be corrupted by different request
+ */
+ cfs_wait_event(iobuf->dr_wait, cfs_atomic_read(&iobuf->dr_numreqs)==0);
+ if (!rc)
+ rc = iobuf->dr_error;
+
RETURN(rc);
}
else
parent = osd->od_obj_area;
- LASSERT(parent != NULL);
- LASSERT(osd_dt_obj(parent)->oo_inode->i_op != NULL);
-
#ifdef HAVE_QUOTA_SUPPORT
osd_push_ctxt(info->oti_env, save);
#endif
inode = ldiskfs_create_inode(oth->ot_handle,
- osd_dt_obj(parent)->oo_inode, mode);
+ parent ? osd_dt_obj(parent)->oo_inode :
+ osd_sb(osd)->s_root->d_inode,
+ mode);
#ifdef HAVE_QUOTA_SUPPORT
osd_pop_ctxt(save);
#endif
.do_data_get = osd_data_get,
};
-/*
- * Body operations.
- */
-
-/*
- * XXX: Another layering violation for now.
- *
- * We don't want to use ->f_op->read methods, because generic file write
- *
- * - serializes on ->i_sem, and
- *
- * - does a lot of extra work like balance_dirty_pages(),
- *
- * which doesn't work for globally shared files like /last-received.
- */
-static int osd_ldiskfs_readlink(struct inode *inode, char *buffer, int buflen)
-{
- struct ldiskfs_inode_info *ei = LDISKFS_I(inode);
-
- memcpy(buffer, (char*)ei->i_data, buflen);
-
- return buflen;
-}
-
-static int osd_ldiskfs_read(struct inode *inode, void *buf, int size,
- loff_t *offs)
-{
- struct buffer_head *bh;
- unsigned long block;
- int osize = size;
- int blocksize;
- int csize;
- int boffs;
- int err;
-
- /* prevent reading after eof */
- spin_lock(&inode->i_lock);
- if (i_size_read(inode) < *offs + size) {
- size = i_size_read(inode) - *offs;
- spin_unlock(&inode->i_lock);
- if (size < 0) {
- CDEBUG(D_EXT2, "size %llu is too short to read @%llu\n",
- i_size_read(inode), *offs);
- return -EBADR;
- } else if (size == 0) {
- return 0;
- }
- } else {
- spin_unlock(&inode->i_lock);
- }
-
- blocksize = 1 << inode->i_blkbits;
-
- while (size > 0) {
- block = *offs >> inode->i_blkbits;
- boffs = *offs & (blocksize - 1);
- csize = min(blocksize - boffs, size);
- bh = ldiskfs_bread(NULL, inode, block, 0, &err);
- if (!bh) {
- CERROR("can't read block: %d\n", err);
- return err;
- }
-
- memcpy(buf, bh->b_data + boffs, csize);
- brelse(bh);
-
- *offs += csize;
- buf += csize;
- size -= csize;
- }
- return osize;
-}
-
-static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
- struct lu_buf *buf, loff_t *pos,
- struct lustre_capa *capa)
-{
- struct osd_object *obj = osd_dt_obj(dt);
- struct inode *inode = obj->oo_inode;
- int rc;
-
- if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
- RETURN(-EACCES);
-
- /* Read small symlink from inode body as we need to maintain correct
- * on-disk symlinks for ldiskfs.
- */
- if (S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
- (buf->lb_len <= sizeof (LDISKFS_I(inode)->i_data)))
- rc = osd_ldiskfs_readlink(inode, buf->lb_buf, buf->lb_len);
- else
- rc = osd_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
-
- return rc;
-}
-
-static int osd_ldiskfs_writelink(struct inode *inode, char *buffer, int buflen)
-{
-
- memcpy((char*)&LDISKFS_I(inode)->i_data, (char *)buffer,
- buflen);
- LDISKFS_I(inode)->i_disksize = buflen;
- i_size_write(inode, buflen);
- inode->i_sb->s_op->dirty_inode(inode);
-
- return 0;
-}
-
-static int osd_ldiskfs_write_record(struct inode *inode, void *buf, int bufsize,
- loff_t *offs, handle_t *handle)
-{
- struct buffer_head *bh = NULL;
- loff_t offset = *offs;
- loff_t new_size = i_size_read(inode);
- unsigned long block;
- int blocksize = 1 << inode->i_blkbits;
- int err = 0;
- int size;
- int boffs;
- int dirty_inode = 0;
-
- while (bufsize > 0) {
- if (bh != NULL)
- brelse(bh);
-
- block = offset >> inode->i_blkbits;
- boffs = offset & (blocksize - 1);
- size = min(blocksize - boffs, bufsize);
- bh = ldiskfs_bread(handle, inode, block, 1, &err);
- if (!bh) {
- CERROR("can't read/create block: %d\n", err);
- break;
- }
-
- err = ldiskfs_journal_get_write_access(handle, bh);
- if (err) {
- CERROR("journal_get_write_access() returned error %d\n",
- err);
- break;
- }
- LASSERTF(boffs + size <= bh->b_size,
- "boffs %d size %d bh->b_size %lu",
- boffs, size, (unsigned long)bh->b_size);
- memcpy(bh->b_data + boffs, buf, size);
- err = ldiskfs_journal_dirty_metadata(handle, bh);
- if (err)
- break;
-
- if (offset + size > new_size)
- new_size = offset + size;
- offset += size;
- bufsize -= size;
- buf += size;
- }
- if (bh)
- brelse(bh);
-
- /* correct in-core and on-disk sizes */
- if (new_size > i_size_read(inode)) {
- spin_lock(&inode->i_lock);
- if (new_size > i_size_read(inode))
- i_size_write(inode, new_size);
- if (i_size_read(inode) > LDISKFS_I(inode)->i_disksize) {
- LDISKFS_I(inode)->i_disksize = i_size_read(inode);
- dirty_inode = 1;
- }
- spin_unlock(&inode->i_lock);
- if (dirty_inode)
- inode->i_sb->s_op->dirty_inode(inode);
- }
-
- if (err == 0)
- *offs = offset;
- return err;
-}
-
-static ssize_t osd_declare_write(const struct lu_env *env, struct dt_object *dt,
- const loff_t size, loff_t pos,
- struct thandle *handle)
-{
- struct osd_thandle *oh;
- int credits;
-
- LASSERT(handle != NULL);
-
- oh = container_of0(handle, struct osd_thandle, ot_super);
- LASSERT(oh->ot_handle == NULL);
-
- /* XXX: size == 0 or INT_MAX indicating a catalog header update or
- * llog write, see comment in mdd_declare_llog_record().
- *
- * This hack should be removed in 2.3
- */
- if (size == DECLARE_LLOG_REWRITE)
- credits = 2;
- else if (size == DECLARE_LLOG_WRITE)
- credits = 6;
- else
- credits = osd_dto_credits_noquota[DTO_WRITE_BLOCK];
-
- OSD_DECLARE_OP(oh, write);
- oh->ot_credits += credits;
-
- if (osd_dt_obj(dt)->oo_inode == NULL)
- return 0;
-
- osd_declare_qid(dt, oh, USRQUOTA, osd_dt_obj(dt)->oo_inode->i_uid,
- osd_dt_obj(dt)->oo_inode);
- osd_declare_qid(dt, oh, GRPQUOTA, osd_dt_obj(dt)->oo_inode->i_gid,
- osd_dt_obj(dt)->oo_inode);
- return 0;
-}
-
-static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
- const struct lu_buf *buf, loff_t *pos,
- struct thandle *handle, struct lustre_capa *capa,
- int ignore_quota)
-{
- struct osd_object *obj = osd_dt_obj(dt);
- struct inode *inode = obj->oo_inode;
- struct osd_thandle *oh;
- ssize_t result = 0;
-#ifdef HAVE_QUOTA_SUPPORT
- cfs_cap_t save = cfs_curproc_cap_pack();
-#endif
-
- LASSERT(handle != NULL);
-
- if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
- RETURN(-EACCES);
-
- oh = container_of(handle, struct osd_thandle, ot_super);
- LASSERT(oh->ot_handle->h_transaction != NULL);
-#ifdef HAVE_QUOTA_SUPPORT
- if (ignore_quota)
- cfs_cap_raise(CFS_CAP_SYS_RESOURCE);
- else
- cfs_cap_lower(CFS_CAP_SYS_RESOURCE);
-#endif
- /* Write small symlink to inode body as we need to maintain correct
- * on-disk symlinks for ldiskfs.
- */
- if(S_ISLNK(obj->oo_dt.do_lu.lo_header->loh_attr) &&
- (buf->lb_len < sizeof (LDISKFS_I(inode)->i_data)))
- result = osd_ldiskfs_writelink(inode, buf->lb_buf, buf->lb_len);
- else
- result = osd_ldiskfs_write_record(inode, buf->lb_buf,
- buf->lb_len, pos,
- oh->ot_handle);
-#ifdef HAVE_QUOTA_SUPPORT
- cfs_curproc_cap_unpack(save);
-#endif
- if (result == 0)
- result = buf->lb_len;
- return result;
-}
-
-/*
- * in some cases we may need declare methods for objects being created
- * e.g., when we create symlink
- */
-static const struct dt_body_operations osd_body_ops_new = {
- .dbo_declare_write = osd_declare_write,
-};
-
-const struct dt_body_operations osd_body_ops = {
- .dbo_read = osd_read,
- .dbo_declare_write = osd_declare_write,
- .dbo_write = osd_write
-};
-
static int osd_index_declare_iam_delete(const struct lu_env *env,
struct dt_object *dt,
const struct dt_key *key,
if (o->od_oi_table != NULL)
osd_oi_fini(info, &o->od_oi_table, o->od_oi_count);
+ if (o->od_fsops) {
+ fsfilt_put_ops(o->od_fsops);
+ o->od_fsops = NULL;
+ }
+
RETURN(0);
}
struct lustre_sb_info *lsi;
ENTRY;
+
+ o->od_fsops = fsfilt_get_ops(mt_str(LDD_MT_LDISKFS));
+ if (o->od_fsops == NULL) {
+ CERROR("Can't find fsfilt_ldiskfs\n");
+ RETURN(-ENOTSUPP);
+ }
+
if (o->od_mount != NULL) {
CERROR("Already mounted (%s)\n", dev);
RETURN(-EEXIST);