Whamcloud - gitweb
LU-13974 llog: check stale osp object
[fs/lustre-release.git] / lustre / osp / osp_object.c
index e327cbb..bd47f5b 100644 (file)
@@ -186,7 +186,7 @@ static inline void osp_oac_xattr_put(struct osp_xattr_entry *oxe)
        if (atomic_dec_and_test(&oxe->oxe_ref)) {
                LASSERT(list_empty(&oxe->oxe_list));
 
-               OBD_FREE(oxe, oxe->oxe_buflen);
+               OBD_FREE_LARGE(oxe, oxe->oxe_buflen);
        }
 }
 
@@ -279,7 +279,7 @@ osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len)
        if (oxe)
                return oxe;
 
-       OBD_ALLOC(oxe, size);
+       OBD_ALLOC_LARGE(oxe, size);
        if (unlikely(!oxe))
                return NULL;
 
@@ -300,7 +300,7 @@ osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len)
        spin_unlock(&obj->opo_lock);
 
        if (tmp) {
-               OBD_FREE(oxe, size);
+               OBD_FREE_LARGE(oxe, size);
                oxe = tmp;
        }
 
@@ -332,7 +332,7 @@ osp_oac_xattr_assignment(struct osp_object *obj, struct osp_xattr_entry *oxe,
        bool unlink_only = false;
 
        if (oxe->oxe_buflen < size) {
-               OBD_ALLOC(new, size);
+               OBD_ALLOC_LARGE(new, size);
                if (likely(new)) {
                        INIT_LIST_HEAD(&new->oxe_list);
                        new->oxe_buflen = size;
@@ -1366,6 +1366,17 @@ int osp_invalidate(const struct lu_env *env, struct dt_object *dt)
        RETURN(0);
 }
 
+bool osp_check_stale(struct dt_object *dt)
+{
+       struct osp_object *obj = dt2osp_obj(dt);
+
+       if (is_ost_obj(&dt->do_lu) && obj->opo_non_exist)
+               return true;
+
+       return obj->opo_stale;
+}
+
+
 /**
  * Implement OSP layer dt_object_operations::do_declare_create() interface.
  *
@@ -1434,10 +1445,8 @@ static int osp_declare_create(const struct lu_env *env, struct dt_object *dt,
 
        if (unlikely(!fid_is_zero(fid))) {
                /* replay case: caller knows fid */
-               osi->osi_off = sizeof(osi->osi_id) * d->opd_index;
-               osi->osi_lb.lb_len = sizeof(osi->osi_id);
-               osi->osi_lb.lb_buf = NULL;
-
+               osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off, NULL,
+                                  d->opd_index);
                rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
                                             &osi->osi_lb, osi->osi_off,
                                             local_th);
@@ -1461,9 +1470,8 @@ static int osp_declare_create(const struct lu_env *env, struct dt_object *dt,
                o->opo_reserved = 1;
 
                /* common for all OSPs file hystorically */
-               osi->osi_off = sizeof(osi->osi_id) * d->opd_index;
-               osi->osi_lb.lb_len = sizeof(osi->osi_id);
-               osi->osi_lb.lb_buf = NULL;
+               osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off, NULL,
+                                  d->opd_index);
                rc = dt_declare_record_write(env, d->opd_last_used_oid_file,
                                             &osi->osi_lb, osi->osi_off,
                                             local_th);
@@ -1506,7 +1514,6 @@ static int osp_create(const struct lu_env *env, struct dt_object *dt,
        int                     rc = 0;
        struct lu_fid           *fid = &osi->osi_fid;
        struct thandle          *local_th;
-       struct lu_fid           *last_fid = &d->opd_last_used_fid;
        ENTRY;
 
        if (is_only_remote_trans(th) &&
@@ -1586,13 +1593,8 @@ static int osp_create(const struct lu_env *env, struct dt_object *dt,
 
        /* Only need update last_used oid file, seq file will only be update
         * during seq rollover */
-       if (fid_is_idif((last_fid)))
-               osi->osi_id = fid_idif_id(fid_seq(last_fid),
-                                         fid_oid(last_fid), fid_ver(last_fid));
-       else
-               osi->osi_id = fid_oid(last_fid);
        osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off,
-                          &osi->osi_id, d->opd_index);
+                          &d->opd_last_id, d->opd_index);
 
        rc = dt_record_write(env, d->opd_last_used_oid_file, &osi->osi_lb,
                             &osi->osi_off, local_th);
@@ -1770,7 +1772,7 @@ void osp_it_fini(const struct lu_env *env, struct dt_it *di)
                                __free_page(pages[i]);
                        }
                }
-               OBD_FREE(pages, npages * sizeof(*pages));
+               OBD_FREE_PTR_ARRAY(pages, npages);
        }
        OBD_FREE_PTR(it);
 }
@@ -1804,7 +1806,7 @@ static int osp_it_fetch(const struct lu_env *env, struct osp_it *it)
        npages = min_t(unsigned int, OFD_MAX_BRW_SIZE, 1 << 20);
        npages /= PAGE_SIZE;
 
-       OBD_ALLOC(pages, npages * sizeof(*pages));
+       OBD_ALLOC_PTR_ARRAY(pages, npages);
        if (pages == NULL)
                RETURN(-ENOMEM);
 
@@ -1853,7 +1855,7 @@ static int osp_it_fetch(const struct lu_env *env, struct osp_it *it)
        ptlrpc_at_set_req_timeout(req);
 
        desc = ptlrpc_prep_bulk_imp(req, npages, 1,
-                                   PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KIOV,
+                                   PTLRPC_BULK_PUT_SINK,
                                    MDS_BULK_PORTAL,
                                    &ptlrpc_bulk_kiov_pin_ops);
        if (desc == NULL)
@@ -1980,7 +1982,7 @@ again0:
                        if (pages[i] != NULL)
                                __free_page(pages[i]);
                }
-               OBD_FREE(pages, it->ooi_total_npages * sizeof(*pages));
+               OBD_FREE_PTR_ARRAY(pages, it->ooi_total_npages);
 
                it->ooi_pos_page = 0;
                it->ooi_total_npages = 0;
@@ -2281,6 +2283,14 @@ static int osp_object_init(const struct lu_env *env, struct lu_object *o,
        RETURN(rc);
 }
 
+static void osp_object_free_rcu(struct rcu_head *head)
+{
+       struct osp_object *obj = container_of(head, struct osp_object,
+                                             opo_header.loh_rcu);
+
+       kmem_cache_free(osp_object_kmem, obj);
+}
+
 /**
  * Implement OSP layer lu_object_operations::loo_object_free() interface.
  *
@@ -2309,9 +2319,10 @@ static void osp_object_free(const struct lu_env *env, struct lu_object *o)
                         "Still has %d users on the xattr entry %.*s\n",
                         count-1, (int)oxe->oxe_namelen, oxe->oxe_buf);
 
-               OBD_FREE(oxe, oxe->oxe_buflen);
+               OBD_FREE_LARGE(oxe, oxe->oxe_buflen);
        }
-       OBD_SLAB_FREE_PTR(obj, osp_object_kmem);
+       OBD_FREE_PRE(obj, sizeof(*obj), "slab-freed");
+       call_rcu(&obj->opo_header.loh_rcu, osp_object_free_rcu);
 }
 
 /**