+static int osp_xattr_get_interpterer(const struct lu_env *env,
+ struct update_reply *reply,
+ struct osp_object *obj,
+ void *data, int index, int rc)
+{
+ struct osp_object_attr *ooa = obj->opo_ooa;
+ struct osp_xattr_entry *oxe = data;
+ struct lu_buf *rbuf = &osp_env_info(env)->osi_lb2;
+
+ LASSERT(ooa != NULL);
+
+ if (rc == 0) {
+ int len = sizeof(*oxe) + oxe->oxe_namelen + 1;
+
+ rc = update_get_reply_buf(reply, rbuf, index);
+ if (rc < 0 || rbuf->lb_len > (oxe->oxe_buflen - len)) {
+ spin_lock(&obj->opo_lock);
+ oxe->oxe_ready = 0;
+ spin_unlock(&obj->opo_lock);
+ osp_oac_xattr_put(oxe);
+
+ return rc < 0 ? rc : -ERANGE;
+ }
+
+ spin_lock(&obj->opo_lock);
+ oxe->oxe_vallen = rbuf->lb_len;
+ memcpy(oxe->oxe_value, rbuf->lb_buf, rbuf->lb_len);
+ oxe->oxe_exist = 1;
+ oxe->oxe_ready = 1;
+ spin_unlock(&obj->opo_lock);
+ } else if (rc == -ENOENT || rc == -ENODATA) {
+ spin_lock(&obj->opo_lock);
+ oxe->oxe_exist = 0;
+ oxe->oxe_ready = 1;
+ spin_unlock(&obj->opo_lock);
+ } else {
+ spin_lock(&obj->opo_lock);
+ oxe->oxe_ready = 0;
+ spin_unlock(&obj->opo_lock);
+ }
+
+ osp_oac_xattr_put(oxe);
+
+ return 0;
+}
+
+static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt,
+ struct lu_buf *buf, const char *name,
+ struct lustre_capa *capa)
+{
+ struct osp_object *obj = dt2osp_obj(dt);
+ struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev);
+ struct update_request *update;
+ struct osp_xattr_entry *oxe;
+ int namelen = strlen(name);
+ int rc = 0;
+
+ LASSERT(buf != NULL);
+ LASSERT(name != NULL);
+
+ /* If only for xattr size, return directly. */
+ if (unlikely(buf->lb_len == 0))
+ return 0;
+
+ if (obj->opo_ooa == NULL) {
+ rc = osp_oac_init(obj);
+ if (rc != 0)
+ return rc;
+ }
+
+ oxe = osp_oac_xattr_find_or_add(obj, name, buf->lb_len);
+ if (oxe == NULL)
+ return -ENOMEM;
+
+ mutex_lock(&osp->opd_async_requests_mutex);
+ update = osp_find_or_create_async_update_request(osp);
+ if (IS_ERR(update)) {
+ rc = PTR_ERR(update);
+ mutex_unlock(&osp->opd_async_requests_mutex);
+ osp_oac_xattr_put(oxe);
+ } else {
+ rc = osp_insert_async_update(env, update, OBJ_XATTR_GET, obj,
+ 1, &namelen, &name, oxe,
+ osp_xattr_get_interpterer);
+ if (rc != 0) {
+ mutex_unlock(&osp->opd_async_requests_mutex);
+ osp_oac_xattr_put(oxe);
+ } else {
+ /* XXX: Currently, we trigger the batched async OUT
+ * RPC via dt_declare_xattr_get(). It is not
+ * perfect solution, but works well now.
+ *
+ * We will improve it in the future. */
+ update = osp->opd_async_requests;
+ if (update != NULL && update->ur_buf != NULL &&
+ update->ur_buf->ub_count > 0) {
+ osp->opd_async_requests = NULL;
+ mutex_unlock(&osp->opd_async_requests_mutex);
+ rc = osp_unplug_async_update(env, osp, update);
+ } else {
+ mutex_unlock(&osp->opd_async_requests_mutex);
+ }
+ }
+ }
+
+ return rc;
+}
+
+int osp_xattr_get(const struct lu_env *env, struct dt_object *dt,
+ struct lu_buf *buf, const char *name,
+ struct lustre_capa *capa)
+{
+ struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev);
+ struct osp_object *obj = dt2osp_obj(dt);
+ struct dt_device *dev = &osp->opd_dt_dev;
+ struct lu_buf *rbuf = &osp_env_info(env)->osi_lb2;
+ struct update_request *update = NULL;
+ struct ptlrpc_request *req = NULL;
+ struct update_reply *reply;
+ struct osp_xattr_entry *oxe = NULL;
+ const char *dname = dt->do_lu.lo_dev->ld_obd->obd_name;
+ int namelen;
+ int rc = 0;
+ ENTRY;
+
+ LASSERT(buf != NULL);
+ LASSERT(name != NULL);
+
+ if (unlikely(obj->opo_non_exist))
+ RETURN(-ENOENT);
+
+ oxe = osp_oac_xattr_find(obj, name);
+ if (oxe != NULL) {
+ spin_lock(&obj->opo_lock);
+ if (oxe->oxe_ready) {
+ if (!oxe->oxe_exist)
+ GOTO(unlock, rc = -ENODATA);
+
+ if (buf->lb_buf == NULL)
+ GOTO(unlock, rc = oxe->oxe_vallen);
+
+ if (buf->lb_len < oxe->oxe_vallen)
+ GOTO(unlock, rc = -ERANGE);
+
+ memcpy(buf->lb_buf, oxe->oxe_value, oxe->oxe_vallen);
+
+ GOTO(unlock, rc = oxe->oxe_vallen);
+
+unlock:
+ spin_unlock(&obj->opo_lock);
+ osp_oac_xattr_put(oxe);
+
+ return rc;
+ }
+ spin_unlock(&obj->opo_lock);
+ }
+
+ update = out_create_update_req(dev);
+ if (IS_ERR(update))
+ GOTO(out, rc = PTR_ERR(update));
+
+ namelen = strlen(name);
+ rc = out_insert_update(env, update, OBJ_XATTR_GET,
+ lu_object_fid(&dt->do_lu), 1, &namelen, &name);
+ if (rc != 0) {
+ CERROR("%s: Insert update error "DFID": rc = %d\n",
+ dname, PFID(lu_object_fid(&dt->do_lu)), rc);
+
+ GOTO(out, rc);
+ }
+
+ rc = out_remote_sync(env, osp->opd_obd->u.cli.cl_import, update, &req);
+ if (rc != 0) {
+ if (obj->opo_ooa == NULL)
+ GOTO(out, rc);
+
+ if (oxe == NULL)
+ oxe = osp_oac_xattr_find_or_add(obj, name, buf->lb_len);
+
+ if (oxe == NULL) {
+ CWARN("%s: Fail to add xattr (%s) to cache for "
+ DFID" (1): rc = %d\n", dname, name,
+ PFID(lu_object_fid(&dt->do_lu)), rc);
+
+ GOTO(out, rc);
+ }
+
+ spin_lock(&obj->opo_lock);
+ if (rc == -ENOENT || rc == -ENODATA) {
+ oxe->oxe_exist = 0;
+ oxe->oxe_ready = 1;
+ } else {
+ oxe->oxe_ready = 0;
+ }
+ spin_unlock(&obj->opo_lock);
+
+ GOTO(out, rc);
+ }
+
+ reply = req_capsule_server_sized_get(&req->rq_pill, &RMF_UPDATE_REPLY,
+ UPDATE_BUFFER_SIZE);
+ if (reply->ur_version != UPDATE_REPLY_V1) {
+ CERROR("%s: Wrong version %x expected %x "DFID": rc = %d\n",
+ dname, reply->ur_version, UPDATE_REPLY_V1,
+ PFID(lu_object_fid(&dt->do_lu)), -EPROTO);
+
+ GOTO(out, rc = -EPROTO);
+ }
+
+ rc = update_get_reply_buf(reply, rbuf, 0);
+ if (rc < 0)
+ GOTO(out, rc);
+
+ LASSERT(rbuf->lb_len > 0 && rbuf->lb_len < PAGE_CACHE_SIZE);
+
+ if (buf->lb_buf == NULL)
+ GOTO(out, rc = rbuf->lb_len);
+
+ if (unlikely(buf->lb_len < rbuf->lb_len))
+ GOTO(out, rc = -ERANGE);
+
+ memcpy(buf->lb_buf, rbuf->lb_buf, rbuf->lb_len);
+ rc = rbuf->lb_len;
+ if (obj->opo_ooa == NULL)
+ GOTO(out, rc);
+
+ if (oxe == NULL) {
+ oxe = osp_oac_xattr_find_or_add(obj, name, rbuf->lb_len);
+ if (oxe == NULL) {
+ CWARN("%s: Fail to add xattr (%s) to "
+ "cache for "DFID" (2): rc = %d\n",
+ dname, name, PFID(lu_object_fid(&dt->do_lu)), rc);
+
+ GOTO(out, rc);
+ }
+ }
+
+ if (oxe->oxe_buflen - oxe->oxe_namelen - 1 < rbuf->lb_len) {
+ struct osp_xattr_entry *old = oxe;
+ struct osp_xattr_entry *tmp;
+
+ tmp = osp_oac_xattr_replace(obj, &old, rbuf->lb_len);
+ osp_oac_xattr_put(oxe);
+ oxe = tmp;
+ if (tmp == NULL) {
+ CWARN("%s: Fail to update xattr (%s) to "
+ "cache for "DFID": rc = %d\n",
+ dname, name, PFID(lu_object_fid(&dt->do_lu)), rc);
+ spin_lock(&obj->opo_lock);
+ oxe->oxe_ready = 0;
+ spin_unlock(&obj->opo_lock);
+
+ GOTO(out, rc);
+ }
+
+ /* Drop the ref for entry on list. */
+ osp_oac_xattr_put(old);
+ }
+
+ spin_lock(&obj->opo_lock);
+ oxe->oxe_vallen = rbuf->lb_len;
+ memcpy(oxe->oxe_value, rbuf->lb_buf, rbuf->lb_len);
+ oxe->oxe_exist = 1;
+ oxe->oxe_ready = 1;
+ spin_unlock(&obj->opo_lock);
+
+ GOTO(out, rc);
+
+out:
+ if (req != NULL)
+ ptlrpc_req_finished(req);
+
+ if (update != NULL && !IS_ERR(update))
+ out_destroy_update_req(update);
+
+ if (oxe != NULL)
+ osp_oac_xattr_put(oxe);
+
+ return rc;
+}
+
+int osp_declare_xattr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, const char *name,
+ int flag, struct thandle *th)
+{
+ struct osp_object *o = dt2osp_obj(dt);
+ struct update_request *update;
+ struct lu_fid *fid;
+ struct osp_xattr_entry *oxe;
+ int sizes[3] = {strlen(name), buf->lb_len,
+ sizeof(int)};
+ char *bufs[3] = {(char *)name, (char *)buf->lb_buf };
+ int rc;
+
+ LASSERT(buf->lb_len > 0 && buf->lb_buf != NULL);
+
+ update = out_find_create_update_loc(th, dt);
+ if (IS_ERR(update)) {
+ CERROR("%s: Get OSP update buf failed "DFID": rc = %d\n",
+ dt->do_lu.lo_dev->ld_obd->obd_name,
+ PFID(lu_object_fid(&dt->do_lu)),
+ (int)PTR_ERR(update));
+
+ return PTR_ERR(update);
+ }
+
+ flag = cpu_to_le32(flag);
+ bufs[2] = (char *)&flag;
+
+ fid = (struct lu_fid *)lu_object_fid(&dt->do_lu);
+ rc = out_insert_update(env, update, OBJ_XATTR_SET, fid,
+ ARRAY_SIZE(sizes), sizes, (const char **)bufs);
+ if (rc != 0 || o->opo_ooa == NULL)
+ return rc;
+
+ oxe = osp_oac_xattr_find_or_add(o, name, buf->lb_len);
+ if (oxe == NULL) {
+ CWARN("%s: Fail to add xattr (%s) to cache for "DFID
+ ": rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name,
+ name, PFID(lu_object_fid(&dt->do_lu)), rc);
+
+ return 0;
+ }
+
+ if (oxe->oxe_buflen - oxe->oxe_namelen - 1 < buf->lb_len) {
+ struct osp_xattr_entry *old = oxe;
+ struct osp_xattr_entry *tmp;
+
+ tmp = osp_oac_xattr_replace(o, &old, buf->lb_len);
+ osp_oac_xattr_put(oxe);
+ oxe = tmp;
+ if (tmp == NULL) {
+ CWARN("%s: Fail to update xattr (%s) to cache for "DFID
+ ": rc = %d\n", dt->do_lu.lo_dev->ld_obd->obd_name,
+ name, PFID(lu_object_fid(&dt->do_lu)), rc);
+ spin_lock(&o->opo_lock);
+ oxe->oxe_ready = 0;
+ spin_unlock(&o->opo_lock);
+
+ return 0;
+ }
+
+ /* Drop the ref for entry on list. */
+ osp_oac_xattr_put(old);
+ }
+
+ spin_lock(&o->opo_lock);
+ oxe->oxe_vallen = buf->lb_len;
+ memcpy(oxe->oxe_value, buf->lb_buf, buf->lb_len);
+ oxe->oxe_exist = 1;
+ oxe->oxe_ready = 1;
+ spin_unlock(&o->opo_lock);
+ osp_oac_xattr_put(oxe);
+
+ return 0;
+}
+
+int osp_xattr_set(const struct lu_env *env, struct dt_object *dt,
+ const struct lu_buf *buf, const char *name, int fl,
+ struct thandle *th, struct lustre_capa *capa)
+{
+ CDEBUG(D_INFO, "xattr %s set object "DFID"\n", name,
+ PFID(&dt->do_lu.lo_header->loh_fid));
+
+ return 0;
+}
+