+/**
+ * Initialize the OSP object attributes cache.
+ *
+ * \param[in] obj pointer to the OSP object
+ *
+ * \retval 0 for success
+ * \retval negative error number on failure
+ */
+int osp_oac_init(struct osp_object *obj)
+{
+ struct osp_object_attr *ooa;
+
+ OBD_ALLOC_PTR(ooa);
+ if (ooa == NULL)
+ return -ENOMEM;
+
+ INIT_LIST_HEAD(&ooa->ooa_xattr_list);
+ spin_lock(&obj->opo_lock);
+ if (likely(obj->opo_ooa == NULL)) {
+ obj->opo_ooa = ooa;
+ spin_unlock(&obj->opo_lock);
+ } else {
+ spin_unlock(&obj->opo_lock);
+ OBD_FREE_PTR(ooa);
+ }
+
+ return 0;
+}
+
+/**
+ * Find the named extended attribute in the OSP object attributes cache.
+ *
+ * The caller should take the osp_object::opo_lock before calling
+ * this function.
+ *
+ * \param[in] ooa pointer to the OSP object attributes cache
+ * \param[in] name the name of the extended attribute
+ * \param[in] namelen the name length of the extended attribute
+ *
+ * \retval pointer to the found extended attribute entry
+ * \retval NULL if the specified extended attribute is not
+ * in the cache
+ */
+static struct osp_xattr_entry *
+osp_oac_xattr_find_locked(struct osp_object_attr *ooa,
+ const char *name, size_t namelen)
+{
+ struct osp_xattr_entry *oxe;
+
+ list_for_each_entry(oxe, &ooa->ooa_xattr_list, oxe_list) {
+ if (namelen == oxe->oxe_namelen &&
+ strncmp(name, oxe->oxe_buf, namelen) == 0)
+ return oxe;
+ }
+
+ return NULL;
+}
+
+/**
+ * Find the named extended attribute in the OSP object attributes cache.
+ *
+ * Call osp_oac_xattr_find_locked() with the osp_object::opo_lock held.
+ *
+ * \param[in] obj pointer to the OSP object
+ * \param[in] name the name of the extended attribute
+ * \param[in] unlink true if the extended attribute entry is to be removed
+ * from the cache
+ *
+ * \retval pointer to the found extended attribute entry
+ * \retval NULL if the specified extended attribute is not
+ * in the cache
+ */
+static struct osp_xattr_entry *osp_oac_xattr_find(struct osp_object *obj,
+ const char *name, bool unlink)
+{
+ struct osp_xattr_entry *oxe = NULL;
+
+ spin_lock(&obj->opo_lock);
+ if (obj->opo_ooa != NULL) {
+ oxe = osp_oac_xattr_find_locked(obj->opo_ooa, name,
+ strlen(name));
+ if (oxe != NULL) {
+ if (unlink)
+ list_del_init(&oxe->oxe_list);
+ else
+ atomic_inc(&oxe->oxe_ref);
+ }
+ }
+ spin_unlock(&obj->opo_lock);
+
+ return oxe;
+}
+
+/**
+ * Find the named extended attribute in the OSP object attributes cache.
+ *
+ * If it is not in the cache, then add an empty entry (that will be
+ * filled later) to cache with the given name.
+ *
+ * \param[in] obj pointer to the OSP object
+ * \param[in] name the name of the extended attribute
+ * \param[in] len the length of the extended attribute value
+ *
+ * \retval pointer to the found or new-created extended
+ * attribute entry
+ * \retval NULL if the specified extended attribute is not in the
+ * cache or fail to add new empty entry to the cache.
+ */
+static struct osp_xattr_entry *
+osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len)
+{
+ struct osp_object_attr *ooa = obj->opo_ooa;
+ struct osp_xattr_entry *oxe;
+ struct osp_xattr_entry *tmp = NULL;
+ size_t namelen = strlen(name);
+ size_t size = sizeof(*oxe) + namelen + 1 + len;
+
+ LASSERT(ooa != NULL);
+
+ oxe = osp_oac_xattr_find(obj, name, false);
+ if (oxe != NULL)
+ return oxe;
+
+ OBD_ALLOC(oxe, size);
+ if (unlikely(oxe == NULL))
+ return NULL;
+
+ INIT_LIST_HEAD(&oxe->oxe_list);
+ oxe->oxe_buflen = size;
+ oxe->oxe_namelen = namelen;
+ memcpy(oxe->oxe_buf, name, namelen);
+ oxe->oxe_value = oxe->oxe_buf + namelen + 1;
+ /* One ref is for the caller, the other is for the entry on the list. */
+ atomic_set(&oxe->oxe_ref, 2);
+
+ spin_lock(&obj->opo_lock);
+ tmp = osp_oac_xattr_find_locked(ooa, name, namelen);
+ if (tmp == NULL)
+ list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list);
+ else
+ atomic_inc(&tmp->oxe_ref);
+ spin_unlock(&obj->opo_lock);
+
+ if (tmp != NULL) {
+ OBD_FREE(oxe, size);
+ oxe = tmp;
+ }
+
+ return oxe;
+}
+
+/**
+ * Add the given extended attribute to the OSP object attributes cache.
+ *
+ * If there is an old extended attributed entry with the same name,
+ * remove it from the cache and return it via the parameter \a poxe.
+ *
+ * \param[in] obj pointer to the OSP object
+ * \param[in,out] poxe double pointer to the OSP object extended attribute
+ * entry: the new extended attribute entry is transferred
+ * via such pointer target, and if old the extended
+ * attribute entry exists, then it will be returned back
+ * via such pointer target.
+ * \param[in] len the length of the (new) extended attribute value
+ *
+ * \retval pointer to the new extended attribute entry
+ * \retval NULL for failure cases.
+ */
+static struct osp_xattr_entry *
+osp_oac_xattr_replace(struct osp_object *obj,
+ struct osp_xattr_entry **poxe, size_t len)
+{
+ struct osp_object_attr *ooa = obj->opo_ooa;
+ struct osp_xattr_entry *oxe;
+ size_t namelen = (*poxe)->oxe_namelen;
+ size_t size = sizeof(*oxe) + namelen + 1 + len;
+
+ LASSERT(ooa != NULL);
+
+ OBD_ALLOC(oxe, size);
+ if (unlikely(oxe == NULL))
+ return NULL;
+
+ INIT_LIST_HEAD(&oxe->oxe_list);
+ oxe->oxe_buflen = size;
+ oxe->oxe_namelen = namelen;
+ memcpy(oxe->oxe_buf, (*poxe)->oxe_buf, namelen);
+ oxe->oxe_value = oxe->oxe_buf + namelen + 1;
+ /* One ref is for the caller, the other is for the entry on the list. */
+ atomic_set(&oxe->oxe_ref, 2);
+
+ spin_lock(&obj->opo_lock);
+ *poxe = osp_oac_xattr_find_locked(ooa, oxe->oxe_buf, namelen);
+ LASSERT(*poxe != NULL);
+
+ list_del_init(&(*poxe)->oxe_list);
+ list_add_tail(&oxe->oxe_list, &ooa->ooa_xattr_list);
+ spin_unlock(&obj->opo_lock);
+
+ return oxe;
+}
+
+/**
+ * Release reference from the OSP object extended attribute entry.
+ *
+ * If it is the last reference, then free the entry.
+ *
+ * \param[in] oxe pointer to the OSP object extended attribute entry.
+ */
+static inline void osp_oac_xattr_put(struct osp_xattr_entry *oxe)
+{
+ if (atomic_dec_and_test(&oxe->oxe_ref)) {
+ LASSERT(list_empty(&oxe->oxe_list));
+
+ OBD_FREE(oxe, oxe->oxe_buflen);
+ }
+}
+
+/**
+ * Parse the OSP object attribute from the RPC reply.
+ *
+ * If the attribute is valid, then it will be added to the OSP object
+ * attributes cache.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] reply pointer to the RPC reply
+ * \param[in] req pointer to the RPC request
+ * \param[out] attr pointer to buffer to hold the output attribute
+ * \param[in] obj pointer to the OSP object
+ * \param[in] index the index of the attribute buffer in the reply
+ *
+ * \retval 0 for success
+ * \retval negative error number on failure
+ */
+static int osp_get_attr_from_reply(const struct lu_env *env,
+ struct object_update_reply *reply,
+ struct ptlrpc_request *req,
+ struct lu_attr *attr,
+ struct osp_object *obj, int index)
+{
+ struct osp_thread_info *osi = osp_env_info(env);
+ struct lu_buf *rbuf = &osi->osi_lb2;
+ struct obdo *lobdo = &osi->osi_obdo;
+ struct obdo *wobdo;
+ int rc;
+
+ rc = object_update_result_data_get(reply, rbuf, index);
+ if (rc < 0)
+ return rc;
+
+ wobdo = rbuf->lb_buf;
+ if (rbuf->lb_len != sizeof(*wobdo))
+ return -EPROTO;
+
+ LASSERT(req != NULL);
+ if (ptlrpc_req_need_swab(req))
+ lustre_swab_obdo(wobdo);
+
+ lustre_get_wire_obdo(NULL, lobdo, wobdo);
+ spin_lock(&obj->opo_lock);
+ if (obj->opo_ooa != NULL) {
+ la_from_obdo(&obj->opo_ooa->ooa_attr, lobdo, lobdo->o_valid);
+ if (attr != NULL)
+ *attr = obj->opo_ooa->ooa_attr;
+ } else {
+ LASSERT(attr != NULL);
+
+ la_from_obdo(attr, lobdo, lobdo->o_valid);
+ }
+ spin_unlock(&obj->opo_lock);
+
+ return 0;
+}
+
+/**
+ * Interpreter function for getting OSP object attribute asynchronously.
+ *
+ * Called to interpret the result of an async mode RPC for getting the
+ * OSP object attribute.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] reply pointer to the RPC reply
+ * \param[in] req pointer to the RPC request
+ * \param[in] obj pointer to the OSP object
+ * \param[out] data pointer to buffer to hold the output attribute
+ * \param[in] index the index of the attribute buffer in the reply
+ * \param[in] rc the result for handling the RPC
+ *
+ * \retval 0 for success
+ * \retval negative error number on failure
+ */
+static int osp_attr_get_interpterer(const struct lu_env *env,
+ struct object_update_reply *reply,
+ struct ptlrpc_request *req,
+ struct osp_object *obj,
+ void *data, int index, int rc)
+{
+ struct lu_attr *attr = data;
+
+ LASSERT(obj->opo_ooa != NULL);
+
+ if (rc == 0) {
+ osp2lu_obj(obj)->lo_header->loh_attr |= LOHA_EXISTS;
+ obj->opo_non_exist = 0;
+
+ return osp_get_attr_from_reply(env, reply, req, NULL, obj,
+ index);
+ } else {
+ if (rc == -ENOENT) {
+ osp2lu_obj(obj)->lo_header->loh_attr &= ~LOHA_EXISTS;
+ obj->opo_non_exist = 1;
+ }
+
+ spin_lock(&obj->opo_lock);
+ attr->la_valid = 0;
+ spin_unlock(&obj->opo_lock);
+ }
+
+ return 0;
+}
+
+/**
+ * Implement OSP layer dt_object_operations::do_declare_attr_get() interface.
+ *
+ * Declare that the caller will get attribute from the specified OST object.
+ *
+ * This function adds an Object Unified Target (OUT) sub-request to the per-OSP
+ * based shared asynchronous request queue. The osp_attr_get_interpterer()
+ * is registered as the interpreter function to handle the result of this
+ * sub-request.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] dt pointer to the OSP layer dt_object
+ *
+ * \retval 0 for success
+ * \retval negative error number on failure
+ */
+static int osp_declare_attr_get(const struct lu_env *env, struct dt_object *dt)
+{
+ struct osp_object *obj = dt2osp_obj(dt);
+ struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev);
+ int rc = 0;
+
+ if (obj->opo_ooa == NULL) {
+ rc = osp_oac_init(obj);
+ if (rc != 0)
+ return rc;
+ }
+
+ mutex_lock(&osp->opd_async_requests_mutex);
+ rc = osp_insert_async_request(env, OUT_ATTR_GET, obj, 0, NULL, NULL,
+ &obj->opo_ooa->ooa_attr,
+ sizeof(struct obdo),
+ osp_attr_get_interpterer);
+ mutex_unlock(&osp->opd_async_requests_mutex);
+
+ return rc;
+}
+
+/**
+ * Implement OSP layer dt_object_operations::do_attr_get() interface.
+ *
+ * Get attribute from the specified MDT/OST object.
+ *
+ * If the attribute is in the OSP object attributes cache, then return
+ * the cached attribute directly. Otherwise it will trigger an OUT RPC
+ * to the peer to get the attribute synchronously, if successful, add it
+ * to the OSP attributes cache. (\see lustre/osp/osp_trans.c for OUT RPC.)
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] dt pointer to the OSP layer dt_object
+ * \param[out] attr pointer to the buffer to hold the output attribute
+ *
+ * \retval 0 for success
+ * \retval negative error number on failure
+ */
+int osp_attr_get(const struct lu_env *env, struct dt_object *dt,
+ struct lu_attr *attr)
+{
+ struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev);
+ struct osp_object *obj = dt2osp_obj(dt);
+ struct dt_device *dev = &osp->opd_dt_dev;
+ struct osp_update_request *update;
+ struct object_update_reply *reply;
+ struct ptlrpc_request *req = NULL;
+ int rc = 0;
+ ENTRY;
+
+ if (is_ost_obj(&dt->do_lu) && obj->opo_non_exist)
+ RETURN(-ENOENT);
+
+ if (obj->opo_ooa != NULL) {
+ spin_lock(&obj->opo_lock);
+ if (obj->opo_ooa->ooa_attr.la_valid != 0 && !obj->opo_stale) {
+ *attr = obj->opo_ooa->ooa_attr;
+ spin_unlock(&obj->opo_lock);
+
+ RETURN(0);
+ }
+ spin_unlock(&obj->opo_lock);
+ }
+
+ update = osp_update_request_create(dev);
+ if (IS_ERR(update))
+ RETURN(PTR_ERR(update));
+
+ rc = osp_update_rpc_pack(env, attr_get, update, OUT_ATTR_GET,
+ lu_object_fid(&dt->do_lu));
+ if (rc != 0) {
+ CERROR("%s: Insert update error "DFID": rc = %d\n",
+ dev->dd_lu_dev.ld_obd->obd_name,
+ PFID(lu_object_fid(&dt->do_lu)), rc);
+
+ GOTO(out, rc);
+ }
+
+ rc = osp_remote_sync(env, osp, update, &req);
+ if (rc != 0) {
+ if (rc == -ENOENT) {
+ osp2lu_obj(obj)->lo_header->loh_attr &= ~LOHA_EXISTS;
+ obj->opo_non_exist = 1;
+ } else {
+ CERROR("%s:osp_attr_get update error "DFID": rc = %d\n",
+ dev->dd_lu_dev.ld_obd->obd_name,
+ PFID(lu_object_fid(&dt->do_lu)), rc);
+ }
+
+ GOTO(out, rc);
+ }
+
+ osp2lu_obj(obj)->lo_header->loh_attr |= LOHA_EXISTS;
+ obj->opo_non_exist = 0;
+ reply = req_capsule_server_sized_get(&req->rq_pill,
+ &RMF_OUT_UPDATE_REPLY,
+ OUT_UPDATE_REPLY_SIZE);
+ if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC)
+ GOTO(out, rc = -EPROTO);
+
+ rc = osp_get_attr_from_reply(env, reply, req, attr, obj, 0);
+ if (rc != 0)
+ GOTO(out, rc);
+
+ spin_lock(&obj->opo_lock);
+ if (obj->opo_stale)
+ obj->opo_stale = 0;
+ spin_unlock(&obj->opo_lock);
+
+ GOTO(out, rc);
+
+out:
+ if (req != NULL)
+ ptlrpc_req_finished(req);
+
+ osp_update_request_destroy(update);
+
+ return rc;
+}
+
+/**
+ * Implement OSP layer dt_object_operations::do_declare_attr_set() interface.
+ *
+ * If the transaction is not remote one, then declare the credits that will
+ * be used for the subsequent llog record for the object's attributes.
+ *
+ * \param[in] env pointer to the thread context
+ * \param[in] dt pointer to the OSP layer dt_object
+ * \param[in] attr pointer to the attribute to be set
+ * \param[in] th pointer to the transaction handler
+ *
+ * \retval 0 for success
+ * \retval negative error number on failure
+ */