+ LINVRNT(osd_invariant(obj));
+
+ return result;
+}
+
+static const struct dt_object_operations osd_obj_ops = {
+ .do_read_lock = osd_object_read_lock,
+ .do_write_lock = osd_object_write_lock,
+ .do_read_unlock = osd_object_read_unlock,
+ .do_write_unlock = osd_object_write_unlock,
+ .do_write_locked = osd_object_write_locked,
+ .do_attr_get = osd_attr_get,
+ .do_attr_set = osd_attr_set,
+ .do_ah_init = osd_ah_init,
+ .do_create = osd_object_create,
+ .do_index_try = osd_index_try,
+ .do_ref_add = osd_object_ref_add,
+ .do_ref_del = osd_object_ref_del,
+ .do_xattr_get = osd_xattr_get,
+ .do_xattr_set = osd_xattr_set,
+ .do_xattr_del = osd_xattr_del,
+ .do_xattr_list = osd_xattr_list,
+ .do_capa_get = osd_capa_get,
+ .do_object_sync = osd_object_sync,
+ .do_version_get = osd_object_version_get,
+ .do_version_set = osd_object_version_set,
+ .do_data_get = osd_data_get,
+};
+
+/**
+ * dt_object_operations for interoperability mode
+ * (i.e. to run 2.0 mds on 1.8 disk) (b11826)
+ */
+static const struct dt_object_operations osd_obj_ea_ops = {
+ .do_read_lock = osd_object_read_lock,
+ .do_write_lock = osd_object_write_lock,
+ .do_read_unlock = osd_object_read_unlock,
+ .do_write_unlock = osd_object_write_unlock,
+ .do_write_locked = osd_object_write_locked,
+ .do_attr_get = osd_attr_get,
+ .do_attr_set = osd_attr_set,
+ .do_ah_init = osd_ah_init,
+ .do_create = osd_object_ea_create,
+ .do_index_try = osd_index_try,
+ .do_ref_add = osd_object_ref_add,
+ .do_ref_del = osd_object_ref_del,
+ .do_xattr_get = osd_xattr_get,
+ .do_xattr_set = osd_xattr_set,
+ .do_xattr_del = osd_xattr_del,
+ .do_xattr_list = osd_xattr_list,
+ .do_capa_get = osd_capa_get,
+ .do_object_sync = osd_object_sync,
+ .do_version_get = osd_object_version_get,
+ .do_version_set = osd_object_version_set,
+ .do_data_get = osd_data_get,
+};
+
+/*
+ * Body operations.
+ */
+
+/*
+ * XXX: Another layering violation for now.
+ *
+ * We don't want to use ->f_op->read methods, because generic file write
+ *
+ * - serializes on ->i_sem, and
+ *
+ * - does a lot of extra work like balance_dirty_pages(),
+ *
+ * which doesn't work for globally shared files like /last-received.
+ */
+int fsfilt_ldiskfs_read(struct inode *inode, void *buf, int size, loff_t *offs);
+int fsfilt_ldiskfs_write_handle(struct inode *inode, void *buf, int bufsize,
+ loff_t *offs, handle_t *handle);
+
+static ssize_t osd_read(const struct lu_env *env, struct dt_object *dt,
+ struct lu_buf *buf, loff_t *pos,
+ struct lustre_capa *capa)
+{
+ struct inode *inode = osd_dt_obj(dt)->oo_inode;
+
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_READ))
+ RETURN(-EACCES);
+
+ return fsfilt_ldiskfs_read(inode, buf->lb_buf, buf->lb_len, pos);
+}
+
+static ssize_t osd_write(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, loff_t *pos,
+ struct thandle *handle, struct lustre_capa *capa,
+ int ignore_quota)
+{
+ struct inode *inode = osd_dt_obj(dt)->oo_inode;
+ struct osd_thandle *oh;
+ ssize_t result;
+#ifdef HAVE_QUOTA_SUPPORT
+ cfs_cap_t save = current->cap_effective;
+#endif
+
+ LASSERT(handle != NULL);
+
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_BODY_WRITE))
+ RETURN(-EACCES);
+
+ oh = container_of(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle->h_transaction != NULL);
+#ifdef HAVE_QUOTA_SUPPORT
+ if (ignore_quota)
+ current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK;
+ else
+ current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
+#endif
+ result = fsfilt_ldiskfs_write_handle(inode, buf->lb_buf, buf->lb_len,
+ pos, oh->ot_handle);
+#ifdef HAVE_QUOTA_SUPPORT
+ current->cap_effective = save;
+#endif
+ if (result == 0)
+ result = buf->lb_len;
+ return result;
+}
+
+static const struct dt_body_operations osd_body_ops = {
+ .dbo_read = osd_read,
+ .dbo_write = osd_write
+};
+
+
+/**
+ * delete a (key, value) pair from index \a dt specified by \a key
+ *
+ * \param dt_object osd index object
+ * \param key key for index
+ * \param rec record reference
+ * \param handle transaction handler
+ *
+ * \retval 0 success
+ * \retval -ve failure
+ */
+
+static int osd_index_iam_delete(const struct lu_env *env, struct dt_object *dt,
+ const struct dt_key *key, struct thandle *handle,
+ struct lustre_capa *capa)
+{
+ struct osd_object *obj = osd_dt_obj(dt);
+ struct osd_thandle *oh;
+ struct iam_path_descr *ipd;
+ struct iam_container *bag = &obj->oo_dir->od_container;
+ int rc;
+
+ ENTRY;
+
+ LINVRNT(osd_invariant(obj));
+ LASSERT(dt_object_exists(dt));
+ LASSERT(bag->ic_object == obj->oo_inode);
+ LASSERT(handle != NULL);
+
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
+ RETURN(-EACCES);
+
+ ipd = osd_idx_ipd_get(env, bag);
+ if (unlikely(ipd == NULL))
+ RETURN(-ENOMEM);
+
+ oh = container_of0(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle != NULL);
+ LASSERT(oh->ot_handle->h_transaction != NULL);
+
+ rc = iam_delete(oh->ot_handle, bag, (const struct iam_key *)key, ipd);
+ osd_ipd_put(env, bag, ipd);
+ LINVRNT(osd_invariant(obj));
+ RETURN(rc);
+}
+
+/**
+ * Index delete function for interoperability mode (b11826).
+ * It will remove the directory entry added by osd_index_ea_insert().
+ * This entry is needed to maintain name->fid mapping.
+ *
+ * \param key, key i.e. file entry to be deleted
+ *
+ * \retval 0, on success
+ * \retval -ve, on error
+ */
+static int osd_index_ea_delete(const struct lu_env *env, struct dt_object *dt,
+ const struct dt_key *key, struct thandle *handle,
+ struct lustre_capa *capa)
+{
+ struct osd_object *obj = osd_dt_obj(dt);
+ struct inode *dir = obj->oo_inode;
+ struct dentry *dentry;
+ struct osd_thandle *oh;
+ struct ldiskfs_dir_entry_2 *de;
+ struct buffer_head *bh;
+
+ int rc;
+
+ ENTRY;
+
+ LINVRNT(osd_invariant(obj));
+ LASSERT(dt_object_exists(dt));
+ LASSERT(handle != NULL);
+
+ oh = container_of(handle, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle != NULL);
+ LASSERT(oh->ot_handle->h_transaction != NULL);
+
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_DELETE))
+ RETURN(-EACCES);
+
+ dentry = osd_child_dentry_get(env, obj,
+ (char *)key, strlen((char *)key));
+
+ down_write(&obj->oo_ext_idx_sem);
+ bh = ldiskfs_find_entry(dentry, &de);
+ if (bh) {
+ struct osd_thread_info *oti = osd_oti_get(env);
+ struct timespec *ctime = &oti->oti_time;
+ struct timespec *mtime = &oti->oti_time2;
+
+ *ctime = dir->i_ctime;
+ *mtime = dir->i_mtime;
+ rc = ldiskfs_delete_entry(oh->ot_handle,
+ dir, de, bh);
+ /* xtime should not be updated with server-side time. */
+ spin_lock(&obj->oo_guard);
+ dir->i_ctime = *ctime;
+ dir->i_mtime = *mtime;
+ spin_unlock(&obj->oo_guard);
+ mark_inode_dirty(dir);
+ brelse(bh);
+ } else
+ rc = -ENOENT;
+
+ up_write(&obj->oo_ext_idx_sem);
+ LASSERT(osd_invariant(obj));
+ RETURN(rc);
+}
+
+/**
+ * Lookup index for \a key and copy record to \a rec.
+ *
+ * \param dt_object osd index object
+ * \param key key for index
+ * \param rec record reference
+ *
+ * \retval +ve success : exact mach
+ * \retval 0 return record with key not greater than \a key
+ * \retval -ve failure
+ */
+static int osd_index_iam_lookup(const struct lu_env *env, struct dt_object *dt,
+ struct dt_rec *rec, const struct dt_key *key,
+ struct lustre_capa *capa)
+{
+ struct osd_object *obj = osd_dt_obj(dt);
+ struct iam_path_descr *ipd;
+ struct iam_container *bag = &obj->oo_dir->od_container;
+ struct osd_thread_info *oti = osd_oti_get(env);
+ struct iam_iterator *it = &oti->oti_idx_it;
+ int rc;
+ ENTRY;
+
+ LASSERT(osd_invariant(obj));
+ LASSERT(dt_object_exists(dt));
+ LASSERT(bag->ic_object == obj->oo_inode);
+
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_LOOKUP))
+ RETURN(-EACCES);
+
+ ipd = osd_idx_ipd_get(env, bag);
+ if (IS_ERR(ipd))
+ RETURN(-ENOMEM);
+
+ /* got ipd now we can start iterator. */
+ iam_it_init(it, bag, 0, ipd);
+
+ rc = iam_it_get(it, (struct iam_key *)key);
+ if (rc >= 0)
+ iam_reccpy(&it->ii_path.ip_leaf, (struct iam_rec *)rec);
+
+ iam_it_put(it);
+ iam_it_fini(it);
+ osd_ipd_put(env, bag, ipd);
+
+ LINVRNT(osd_invariant(obj));
+
+ RETURN(rc);
+}
+
+/**
+ * Inserts (key, value) pair in \a dt index object.
+ *
+ * \param dt osd index object
+ * \param key key for index
+ * \param rec record reference
+ * \param th transaction handler
+ *
+ * \retval 0 success
+ * \retval -ve failure
+ */
+static int osd_index_iam_insert(const struct lu_env *env, struct dt_object *dt,
+ const struct dt_rec *rec, const struct dt_key *key,
+ struct thandle *th, struct lustre_capa *capa,
+ int ignore_quota)
+{
+ struct osd_object *obj = osd_dt_obj(dt);
+ struct iam_path_descr *ipd;
+ struct osd_thandle *oh;
+ struct iam_container *bag = &obj->oo_dir->od_container;
+#ifdef HAVE_QUOTA_SUPPORT
+ cfs_cap_t save = current->cap_effective;
+#endif
+ int rc;
+
+ ENTRY;
+
+ LINVRNT(osd_invariant(obj));
+ LASSERT(dt_object_exists(dt));
+ LASSERT(bag->ic_object == obj->oo_inode);
+ LASSERT(th != NULL);
+
+ if (osd_object_auth(env, dt, capa, CAPA_OPC_INDEX_INSERT))
+ return -EACCES;
+
+ ipd = osd_idx_ipd_get(env, bag);
+ if (unlikely(ipd == NULL))
+ RETURN(-ENOMEM);
+
+ oh = container_of0(th, struct osd_thandle, ot_super);
+ LASSERT(oh->ot_handle != NULL);
+ LASSERT(oh->ot_handle->h_transaction != NULL);
+#ifdef HAVE_QUOTA_SUPPORT
+ if (ignore_quota)
+ current->cap_effective |= CFS_CAP_SYS_RESOURCE_MASK;
+ else
+ current->cap_effective &= ~CFS_CAP_SYS_RESOURCE_MASK;
+#endif
+ rc = iam_insert(oh->ot_handle, bag, (const struct iam_key *)key,
+ (struct iam_rec *)rec, ipd);
+#ifdef HAVE_QUOTA_SUPPORT
+ current->cap_effective = save;
+#endif
+ osd_ipd_put(env, bag, ipd);
+ LINVRNT(osd_invariant(obj));
+ RETURN(rc);
+}
+
+/**
+ * Calls ldiskfs_add_entry() to add directory entry
+ * into the directory. This is required for
+ * interoperability mode (b11826)
+ *
+ * \retval 0, on success
+ * \retval -ve, on error
+ */
+static int __osd_ea_add_rec(struct osd_thread_info *info,
+ struct osd_object *pobj,
+ struct osd_object *cobj,
+ const char *name,
+ struct thandle *th)
+{
+ struct dentry *child;
+ struct osd_thandle *oth;
+ struct inode *cinode = cobj->oo_inode;
+ int rc;
+
+ oth = container_of(th, struct osd_thandle, ot_super);
+ LASSERT(oth->ot_handle != NULL);
+ LASSERT(oth->ot_handle->h_transaction != NULL);
+
+ child = osd_child_dentry_get(info->oti_env, pobj, name, strlen(name));
+ rc = ldiskfs_add_entry(oth->ot_handle, child, cinode);
+
+ RETURN(rc);
+}
+
+/**
+ * Calls ldiskfs_add_dot_dotdot() to add dot and dotdot entries
+ * into the directory.Also sets flags into osd object to
+ * indicate dot and dotdot are created. This is required for
+ * interoperability mode (b11826)
+ *
+ * \param dir directory for dot and dotdot fixup.
+ * \param obj child object for linking
+ *
+ * \retval 0, on success
+ * \retval -ve, on error
+ */
+static int osd_add_dot_dotdot(struct osd_thread_info *info,
+ struct osd_object *dir,
+ struct osd_object *obj, const char *name,
+ struct thandle *th)
+{
+ struct inode *parent_dir = obj->oo_inode;
+ struct inode *inode = dir->oo_inode;
+ struct osd_thandle *oth;
+ int result = 0;
+
+ oth = container_of(th, struct osd_thandle, ot_super);
+ LASSERT(oth->ot_handle->h_transaction != NULL);
+ LASSERT(S_ISDIR(dir->oo_inode->i_mode));
+
+ if (strcmp(name, dot) == 0) {
+ if (dir->oo_compat_dot_created) {
+ result = -EEXIST;
+ } else {
+ LASSERT(obj == dir);
+ dir->oo_compat_dot_created = 1;
+ result = 0;
+ }
+ } else if(strcmp(name, dotdot) == 0) {
+ if (!dir->oo_compat_dot_created)
+ return -EINVAL;
+ if (dir->oo_compat_dotdot_created)
+ return __osd_ea_add_rec(info, dir, obj, name, th);
+
+ result = ldiskfs_add_dot_dotdot(oth->ot_handle, parent_dir, inode);
+ if (result == 0)
+ dir->oo_compat_dotdot_created = 1;
+ }