X-Git-Url: https://git.whamcloud.com/?a=blobdiff_plain;f=lustre%2Fosp%2Fosp_object.c;h=bd47f5b6775b192078d0c0585d28c2e04b686f0d;hb=82c6e42d6137f39a1f2394b7bc6e8d600eb36181;hp=3e3b005befb4a8ef5eb726ec815f32eb3e4d3b35;hpb=783fccdde2e947628c7ee0eaa0c6bf86fd65e2e8;p=fs%2Flustre-release.git diff --git a/lustre/osp/osp_object.c b/lustre/osp/osp_object.c index 3e3b005..bd47f5b 100644 --- a/lustre/osp/osp_object.c +++ b/lustre/osp/osp_object.c @@ -23,7 +23,7 @@ * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved. * Use is subject to license terms. * - * Copyright (c) 2012, 2016, Intel Corporation. + * Copyright (c) 2012, 2017, Intel Corporation. */ /* * lustre/osp/osp_object.c @@ -186,7 +186,7 @@ static inline void osp_oac_xattr_put(struct osp_xattr_entry *oxe) if (atomic_dec_and_test(&oxe->oxe_ref)) { LASSERT(list_empty(&oxe->oxe_list)); - OBD_FREE(oxe, oxe->oxe_buflen); + OBD_FREE_LARGE(oxe, oxe->oxe_buflen); } } @@ -279,7 +279,7 @@ osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len) if (oxe) return oxe; - OBD_ALLOC(oxe, size); + OBD_ALLOC_LARGE(oxe, size); if (unlikely(!oxe)) return NULL; @@ -300,7 +300,7 @@ osp_oac_xattr_find_or_add(struct osp_object *obj, const char *name, size_t len) spin_unlock(&obj->opo_lock); if (tmp) { - OBD_FREE(oxe, size); + OBD_FREE_LARGE(oxe, size); oxe = tmp; } @@ -332,7 +332,7 @@ osp_oac_xattr_assignment(struct osp_object *obj, struct osp_xattr_entry *oxe, bool unlink_only = false; if (oxe->oxe_buflen < size) { - OBD_ALLOC(new, size); + OBD_ALLOC_LARGE(new, size); if (likely(new)) { INIT_LIST_HEAD(&new->oxe_list); new->oxe_buflen = size; @@ -431,11 +431,13 @@ static int osp_get_attr_from_reply(const struct lu_env *env, lustre_swab_obdo(wobdo); lustre_get_wire_obdo(NULL, lobdo, wobdo); - spin_lock(&obj->opo_lock); - la_from_obdo(&obj->opo_attr, lobdo, lobdo->o_valid); - if (attr != NULL) - *attr = obj->opo_attr; - spin_unlock(&obj->opo_lock); + if (obj) { + spin_lock(&obj->opo_lock); + la_from_obdo(&obj->opo_attr, lobdo, lobdo->o_valid); + spin_unlock(&obj->opo_lock); + } + if (attr) + la_from_obdo(attr, lobdo, lobdo->o_valid); return 0; } @@ -542,7 +544,7 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt, struct osp_update_request *update; struct object_update_reply *reply; struct ptlrpc_request *req = NULL; - int rc = 0; + int invalidated, cache = 0, rc = 0; ENTRY; if (is_ost_obj(&dt->do_lu) && obj->opo_non_exist) @@ -561,21 +563,31 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt, if (IS_ERR(update)) RETURN(PTR_ERR(update)); - rc = osp_update_rpc_pack(env, attr_get, update, OUT_ATTR_GET, + rc = OSP_UPDATE_RPC_PACK(env, out_attr_get_pack, update, lu_object_fid(&dt->do_lu)); if (rc != 0) { CERROR("%s: Insert update error "DFID": rc = %d\n", dev->dd_lu_dev.ld_obd->obd_name, PFID(lu_object_fid(&dt->do_lu)), rc); - GOTO(out, rc); + GOTO(out_req, rc); } + invalidated = atomic_read(&obj->opo_invalidate_seq); + rc = osp_remote_sync(env, osp, update, &req); + + down_read(&obj->opo_invalidate_sem); + if (invalidated == atomic_read(&obj->opo_invalidate_seq)) { + /* no invalited has came so far, we can cache the attrs */ + cache = 1; + } + if (rc != 0) { if (rc == -ENOENT) { osp2lu_obj(obj)->lo_header->loh_attr &= ~LOHA_EXISTS; - obj->opo_non_exist = 1; + if (cache) + obj->opo_non_exist = 1; } else { CERROR("%s:osp_attr_get update error "DFID": rc = %d\n", dev->dd_lu_dev.ld_obd->obd_name, @@ -593,17 +605,22 @@ int osp_attr_get(const struct lu_env *env, struct dt_object *dt, if (reply == NULL || reply->ourp_magic != UPDATE_REPLY_MAGIC) GOTO(out, rc = -EPROTO); - rc = osp_get_attr_from_reply(env, reply, req, attr, obj, 0); + rc = osp_get_attr_from_reply(env, reply, req, attr, + cache ? obj : NULL, 0); if (rc != 0) GOTO(out, rc); spin_lock(&obj->opo_lock); - obj->opo_stale = 0; + if (cache) + obj->opo_stale = 0; spin_unlock(&obj->opo_lock); GOTO(out, rc); out: + up_read(&obj->opo_invalidate_sem); + +out_req: if (req != NULL) ptlrpc_req_finished(req); @@ -667,10 +684,10 @@ static int osp_declare_attr_set(const struct lu_env *env, struct dt_object *dt, RETURN(rc); } - if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID))) + if (!(attr->la_valid & LA_REMOTE_ATTR_SET)) RETURN(0); - /* track all UID/GID changes via llog */ + /* track all UID/GID, projid, and layout version changes via llog */ rc = osp_sync_declare_add(env, o, MDS_SETATTR64_REC, th); return 0; @@ -704,13 +721,41 @@ static int osp_attr_set(const struct lu_env *env, struct dt_object *dt, int rc = 0; ENTRY; - /* we're interested in uid/gid/projid changes only */ - if (!(attr->la_valid & (LA_UID | LA_GID | LA_PROJID))) + /* we're interested in uid/gid/projid/layout version changes only */ + if (!(attr->la_valid & LA_REMOTE_ATTR_SET)) RETURN(0); if (!is_only_remote_trans(th)) { - rc = osp_sync_add(env, o, MDS_SETATTR64_REC, th, attr); - /* XXX: send new uid/gid to OST ASAP? */ + if (attr->la_flags & LUSTRE_SET_SYNC_FL) { + struct ptlrpc_request *req = NULL; + struct osp_update_request *update = NULL; + struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); + + update = osp_update_request_create(&osp->opd_dt_dev); + if (IS_ERR(update)) + RETURN(PTR_ERR(update)); + + rc = OSP_UPDATE_RPC_PACK(env, out_attr_set_pack, update, + lu_object_fid(&dt->do_lu), + attr); + if (rc != 0) { + CERROR("%s: update error "DFID": rc = %d\n", + osp->opd_obd->obd_name, + PFID(lu_object_fid(&dt->do_lu)), rc); + + osp_update_request_destroy(env, update); + RETURN(rc); + } + + rc = osp_remote_sync(env, osp, update, &req); + if (req != NULL) + ptlrpc_req_finished(req); + + osp_update_request_destroy(env, update); + } else { + rc = osp_sync_add(env, o, MDS_SETATTR64_REC, th, attr); + /* XXX: send new uid/gid to OST ASAP? */ + } } else { struct lu_attr *la; @@ -769,42 +814,36 @@ static int osp_xattr_get_interpterer(const struct lu_env *env, void *data, int index, int rc) { struct osp_xattr_entry *oxe = data; - struct lu_buf *rbuf = &osp_env_info(env)->osi_lb2; - if (!rc) { + spin_lock(&obj->opo_lock); + if (rc >= 0) { + struct lu_buf *rbuf = &osp_env_info(env)->osi_lb2; size_t len = sizeof(*oxe) + oxe->oxe_namelen + 1; rc = object_update_result_data_get(reply, rbuf, index); - spin_lock(&obj->opo_lock); - if (rc < 0 || rbuf->lb_len == 0 || - rbuf->lb_len > (oxe->oxe_buflen - len)) { - if (unlikely(rc == -ENODATA)) { - oxe->oxe_exist = 0; - oxe->oxe_ready = 1; - } else { - oxe->oxe_ready = 0; - } - spin_unlock(&obj->opo_lock); - /* Put the reference obtained in the - * osp_declare_xattr_get(). */ - osp_oac_xattr_put(oxe); + if (rc == -ENOENT || rc == -ENODATA || rc == 0) { + oxe->oxe_exist = 0; + oxe->oxe_ready = 1; + goto unlock; + } - return rc < 0 ? rc : -ERANGE; + if (unlikely(rc < 0) || + rbuf->lb_len > (oxe->oxe_buflen - len)) { + oxe->oxe_ready = 0; + goto unlock; } __osp_oac_xattr_assignment(obj, oxe, rbuf); - spin_unlock(&obj->opo_lock); } else if (rc == -ENOENT || rc == -ENODATA) { - spin_lock(&obj->opo_lock); oxe->oxe_exist = 0; oxe->oxe_ready = 1; - spin_unlock(&obj->opo_lock); } else { - spin_lock(&obj->opo_lock); oxe->oxe_ready = 0; - spin_unlock(&obj->opo_lock); } +unlock: + spin_unlock(&obj->opo_lock); + /* Put the reference obtained in the osp_declare_xattr_get(). */ osp_oac_xattr_put(oxe); @@ -835,8 +874,8 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt, struct osp_object *obj = dt2osp_obj(dt); struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); struct osp_xattr_entry *oxe; - __u16 namelen; int rc = 0; + __u16 len; LASSERT(buf != NULL); LASSERT(name != NULL); @@ -848,10 +887,10 @@ static int osp_declare_xattr_get(const struct lu_env *env, struct dt_object *dt, if (oxe == NULL) return -ENOMEM; - namelen = strlen(name); + len = strlen(name) + 1; mutex_lock(&osp->opd_async_requests_mutex); rc = osp_insert_async_request(env, OUT_XATTR_GET, obj, 1, - &namelen, (const void **)&name, + &len, (const void **)&name, oxe, buf->lb_len, osp_xattr_get_interpterer); if (rc != 0) { @@ -916,7 +955,7 @@ int osp_xattr_get(const struct lu_env *env, struct dt_object *dt, struct object_update_reply *reply; struct osp_xattr_entry *oxe = NULL; const char *dname = osp_dto2name(obj); - int rc = 0; + int invalidated, rc = 0; ENTRY; LASSERT(buf != NULL); @@ -936,6 +975,8 @@ int osp_xattr_get(const struct lu_env *env, struct dt_object *dt, if (unlikely(obj->opo_non_exist)) RETURN(-ENOENT); + invalidated = atomic_read(&obj->opo_invalidate_seq); + oxe = osp_oac_xattr_find(obj, name, false); if (oxe != NULL) { spin_lock(&obj->opo_lock); @@ -964,17 +1005,40 @@ unlock: } update = osp_update_request_create(dev); if (IS_ERR(update)) - GOTO(out, rc = PTR_ERR(update)); + GOTO(out_req, rc = PTR_ERR(update)); - rc = osp_update_rpc_pack(env, xattr_get, update, OUT_XATTR_GET, + rc = OSP_UPDATE_RPC_PACK(env, out_xattr_get_pack, update, lu_object_fid(&dt->do_lu), name, buf->lb_len); if (rc != 0) { CERROR("%s: Insert update error "DFID": rc = %d\n", dname, PFID(lu_object_fid(&dt->do_lu)), rc); - GOTO(out, rc); + GOTO(out_req, rc); } rc = osp_remote_sync(env, osp, update, &req); + + down_read(&obj->opo_invalidate_sem); + if (invalidated != atomic_read(&obj->opo_invalidate_seq)) { + /* invalidated has been requested, we can't cache the result */ + if (rc < 0) { + if (rc == -ENOENT) + dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS; + GOTO(out, rc); + } + reply = req_capsule_server_sized_get(&req->rq_pill, + &RMF_OUT_UPDATE_REPLY, + OUT_UPDATE_REPLY_SIZE); + if (reply->ourp_magic != UPDATE_REPLY_MAGIC) { + CERROR("%s: Wrong version %x expected %x "DFID + ": rc = %d\n", dname, reply->ourp_magic, + UPDATE_REPLY_MAGIC, + PFID(lu_object_fid(&dt->do_lu)), -EPROTO); + GOTO(out, rc = -EPROTO); + } + rc = object_update_result_data_get(reply, rbuf, 0); + GOTO(out, rc); + } + if (rc < 0) { if (rc == -ENOENT) { dt->do_lu.lo_header->loh_attr &= ~LOHA_EXISTS; @@ -1017,6 +1081,17 @@ unlock: rc = object_update_result_data_get(reply, rbuf, 0); if (rc < 0 || rbuf->lb_len == 0) { + if (oxe == NULL && rc == -ENODATA) { + oxe = osp_oac_xattr_find_or_add(obj, name, buf->lb_len); + if (oxe == NULL) { + rc = -ENOMEM; + CWARN("%s: Fail to add xattr (%s) to cache for " + DFID" (1): rc = %d\n", dname, name, + PFID(lu_object_fid(&dt->do_lu)), rc); + GOTO(out, rc); + } + } + if (oxe) { spin_lock(&obj->opo_lock); if (unlikely(rc == -ENODATA)) { @@ -1051,6 +1126,9 @@ unlock: GOTO(out, rc); out: + up_read(&obj->opo_invalidate_sem); + +out_req: if (rc > 0 && buf->lb_buf) { if (unlikely(buf->lb_len < rbuf->lb_len)) rc = -ERANGE; @@ -1125,10 +1203,10 @@ int osp_xattr_set(const struct lu_env *env, struct dt_object *dt, const struct lu_buf *buf, const char *name, int fl, struct thandle *th) { - struct osp_object *o = dt2osp_obj(dt); + struct osp_object *o = dt2osp_obj(dt); struct osp_update_request *update; - struct osp_xattr_entry *oxe; - int rc; + struct osp_xattr_entry *oxe; + int rc; ENTRY; update = thandle_to_osp_update_request(th); @@ -1137,7 +1215,7 @@ int osp_xattr_set(const struct lu_env *env, struct dt_object *dt, CDEBUG(D_INODE, DFID" set xattr '%s' with size %zd\n", PFID(lu_object_fid(&dt->do_lu)), name, buf->lb_len); - rc = osp_update_rpc_pack(env, xattr_set, update, OUT_XATTR_SET, + rc = OSP_UPDATE_RPC_PACK(env, out_xattr_set_pack, update, lu_object_fid(&dt->do_lu), buf, name, fl); if (rc != 0) RETURN(rc); @@ -1214,16 +1292,15 @@ int osp_xattr_del(const struct lu_env *env, struct dt_object *dt, const char *name, struct thandle *th) { struct osp_update_request *update; - const struct lu_fid *fid = lu_object_fid(&dt->do_lu); - struct osp_object *o = dt2osp_obj(dt); - struct osp_xattr_entry *oxe; - int rc; + const struct lu_fid *fid = lu_object_fid(&dt->do_lu); + struct osp_object *o = dt2osp_obj(dt); + struct osp_xattr_entry *oxe; + int rc; update = thandle_to_osp_update_request(th); LASSERT(update != NULL); - rc = osp_update_rpc_pack(env, xattr_del, update, OUT_XATTR_DEL, - fid, name); + rc = OSP_UPDATE_RPC_PACK(env, out_xattr_del_pack, update, fid, name); if (rc != 0) return rc; @@ -1268,15 +1345,38 @@ int osp_invalidate(const struct lu_env *env, struct dt_object *dt) CDEBUG(D_HA, "Invalidate osp_object "DFID"\n", PFID(lu_object_fid(&dt->do_lu))); - osp_obj_invalidate_cache(obj); + + /* serialize attr/EA set vs. invalidation */ + down_write(&obj->opo_invalidate_sem); + + /* this should invalidate all in-flights */ + atomic_inc(&obj->opo_invalidate_seq); spin_lock(&obj->opo_lock); - obj->opo_stale = 1; + /* do not mark new objects stale */ + if (obj->opo_attr.la_valid) + obj->opo_stale = 1; + obj->opo_non_exist = 0; spin_unlock(&obj->opo_lock); + osp_obj_invalidate_cache(obj); + + up_write(&obj->opo_invalidate_sem); + RETURN(0); } +bool osp_check_stale(struct dt_object *dt) +{ + struct osp_object *obj = dt2osp_obj(dt); + + if (is_ost_obj(&dt->do_lu) && obj->opo_non_exist) + return true; + + return obj->opo_stale; +} + + /** * Implement OSP layer dt_object_operations::do_declare_create() interface. * @@ -1284,7 +1384,7 @@ int osp_invalidate(const struct lu_env *env, struct dt_object *dt) * * If the transaction is a remote transaction and the FID for the OST-object * has been assigned already, then handle it as creating (remote) MDT object - * via osp_md_declare_object_create(). This function is usually used for LFSCK + * via osp_md_declare_create(). This function is usually used for LFSCK * to re-create the lost OST object. Otherwise, if it is not replay case, the * OSP will reserve pre-created object for the subsequent create operation; * if the MDT side cached pre-created objects are less than some threshold, @@ -1301,12 +1401,10 @@ int osp_invalidate(const struct lu_env *env, struct dt_object *dt) * \retval 0 for success * \retval negative error number on failure */ -static int osp_declare_object_create(const struct lu_env *env, - struct dt_object *dt, - struct lu_attr *attr, - struct dt_allocation_hint *hint, - struct dt_object_format *dof, - struct thandle *th) +static int osp_declare_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, + struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { struct osp_thread_info *osi = osp_env_info(env); struct osp_device *d = lu2osp_dev(dt->do_lu.lo_dev); @@ -1320,7 +1418,7 @@ static int osp_declare_object_create(const struct lu_env *env, if (is_only_remote_trans(th) && !fid_is_zero(fid)) { LASSERT(fid_is_sane(fid)); - rc = osp_md_declare_object_create(env, dt, attr, hint, dof, th); + rc = osp_md_declare_create(env, dt, attr, hint, dof, th); RETURN(rc); } @@ -1347,10 +1445,8 @@ static int osp_declare_object_create(const struct lu_env *env, if (unlikely(!fid_is_zero(fid))) { /* replay case: caller knows fid */ - osi->osi_off = sizeof(osi->osi_id) * d->opd_index; - osi->osi_lb.lb_len = sizeof(osi->osi_id); - osi->osi_lb.lb_buf = NULL; - + osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off, NULL, + d->opd_index); rc = dt_declare_record_write(env, d->opd_last_used_oid_file, &osi->osi_lb, osi->osi_off, local_th); @@ -1374,9 +1470,8 @@ static int osp_declare_object_create(const struct lu_env *env, o->opo_reserved = 1; /* common for all OSPs file hystorically */ - osi->osi_off = sizeof(osi->osi_id) * d->opd_index; - osi->osi_lb.lb_len = sizeof(osi->osi_id); - osi->osi_lb.lb_buf = NULL; + osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off, NULL, + d->opd_index); rc = dt_declare_record_write(env, d->opd_last_used_oid_file, &osi->osi_lb, osi->osi_off, local_th); @@ -1395,7 +1490,7 @@ static int osp_declare_object_create(const struct lu_env *env, * * If the transaction is a remote transaction and the FID for the OST-object * has been assigned already, then handle it as handling MDT object via the - * osp_md_object_create(). For other cases, the OSP will assign FID to the + * osp_md_create(). For other cases, the OSP will assign FID to the * object to be created, and update last_used Object ID (OID) file. * * \param[in] env pointer to the thread context @@ -1409,10 +1504,9 @@ static int osp_declare_object_create(const struct lu_env *env, * \retval 0 for success * \retval negative error number on failure */ -static int osp_object_create(const struct lu_env *env, struct dt_object *dt, - struct lu_attr *attr, - struct dt_allocation_hint *hint, - struct dt_object_format *dof, struct thandle *th) +static int osp_create(const struct lu_env *env, struct dt_object *dt, + struct lu_attr *attr, struct dt_allocation_hint *hint, + struct dt_object_format *dof, struct thandle *th) { struct osp_thread_info *osi = osp_env_info(env); struct osp_device *d = lu2osp_dev(dt->do_lu.lo_dev); @@ -1426,7 +1520,7 @@ static int osp_object_create(const struct lu_env *env, struct dt_object *dt, !fid_is_zero(lu_object_fid(&dt->do_lu))) { LASSERT(fid_is_sane(lu_object_fid(&dt->do_lu))); - rc = osp_md_object_create(env, dt, attr, hint, dof, th); + rc = osp_md_create(env, dt, attr, hint, dof, th); if (rc == 0) o->opo_non_exist = 0; @@ -1479,8 +1573,12 @@ static int osp_object_create(const struct lu_env *env, struct dt_object *dt, if (d->opd_gap_count > 0) { int count = d->opd_gap_count; - ostid_set_id(&osi->osi_oi, - fid_oid(&d->opd_gap_start_fid)); + rc = ostid_set_id(&osi->osi_oi, + fid_oid(&d->opd_gap_start_fid)); + if (rc) { + spin_unlock(&d->opd_pre_lock); + RETURN(rc); + } d->opd_gap_count = 0; spin_unlock(&d->opd_pre_lock); @@ -1496,7 +1594,7 @@ static int osp_object_create(const struct lu_env *env, struct dt_object *dt, /* Only need update last_used oid file, seq file will only be update * during seq rollover */ osp_objid_buf_prep(&osi->osi_lb, &osi->osi_off, - &d->opd_last_used_fid.f_oid, d->opd_index); + &d->opd_last_id, d->opd_index); rc = dt_record_write(env, d->opd_last_used_oid_file, &osi->osi_lb, &osi->osi_off, local_th); @@ -1522,8 +1620,8 @@ static int osp_object_create(const struct lu_env *env, struct dt_object *dt, * \retval 0 for success * \retval negative error number on failure */ -int osp_declare_object_destroy(const struct lu_env *env, - struct dt_object *dt, struct thandle *th) +int osp_declare_destroy(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { struct osp_object *o = dt2osp_obj(dt); struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); @@ -1532,7 +1630,9 @@ int osp_declare_object_destroy(const struct lu_env *env, ENTRY; LASSERT(!osp->opd_connect_mdt); - rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th); + + if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ)) + rc = osp_sync_declare_add(env, o, MDS_UNLINK64_REC, th); RETURN(rc); } @@ -1554,8 +1654,8 @@ int osp_declare_object_destroy(const struct lu_env *env, * \retval 0 for success * \retval negative error number on failure */ -static int osp_object_destroy(const struct lu_env *env, struct dt_object *dt, - struct thandle *th) +static int osp_destroy(const struct lu_env *env, struct dt_object *dt, + struct thandle *th) { struct osp_object *o = dt2osp_obj(dt); struct osp_device *osp = lu2osp_dev(dt->do_lu.lo_dev); @@ -1566,11 +1666,14 @@ static int osp_object_destroy(const struct lu_env *env, struct dt_object *dt, o->opo_non_exist = 1; LASSERT(!osp->opd_connect_mdt); - /* once transaction is committed put proper command on - * the queue going to our OST. */ - rc = osp_sync_add(env, o, MDS_UNLINK64_REC, th, NULL); - if (rc < 0) - RETURN(rc); + + if (!OBD_FAIL_CHECK(OBD_FAIL_LFSCK_LOST_MDTOBJ)) { + /* once transaction is committed put proper command on + * the queue going to our OST. */ + rc = osp_sync_add(env, o, MDS_UNLINK64_REC, th, NULL); + if (rc < 0) + RETURN(rc); + } /* not needed in cache any more */ set_bit(LU_OBJECT_HEARD_BANSHEE, &dt->do_lu.lo_header->loh_flags); @@ -1599,8 +1702,7 @@ static int osp_orphan_index_insert(const struct lu_env *env, struct dt_object *dt, const struct dt_rec *rec, const struct dt_key *key, - struct thandle *handle, - int ignore_quota) + struct thandle *handle) { return -EOPNOTSUPP; } @@ -1670,7 +1772,7 @@ void osp_it_fini(const struct lu_env *env, struct dt_it *di) __free_page(pages[i]); } } - OBD_FREE(pages, npages * sizeof(*pages)); + OBD_FREE_PTR_ARRAY(pages, npages); } OBD_FREE_PTR(it); } @@ -1704,7 +1806,7 @@ static int osp_it_fetch(const struct lu_env *env, struct osp_it *it) npages = min_t(unsigned int, OFD_MAX_BRW_SIZE, 1 << 20); npages /= PAGE_SIZE; - OBD_ALLOC(pages, npages * sizeof(*pages)); + OBD_ALLOC_PTR_ARRAY(pages, npages); if (pages == NULL) RETURN(-ENOMEM); @@ -1753,7 +1855,7 @@ static int osp_it_fetch(const struct lu_env *env, struct osp_it *it) ptlrpc_at_set_req_timeout(req); desc = ptlrpc_prep_bulk_imp(req, npages, 1, - PTLRPC_BULK_PUT_SINK | PTLRPC_BULK_BUF_KIOV, + PTLRPC_BULK_PUT_SINK, MDS_BULK_PORTAL, &ptlrpc_bulk_kiov_pin_ops); if (desc == NULL) @@ -1880,7 +1982,7 @@ again0: if (pages[i] != NULL) __free_page(pages[i]); } - OBD_FREE(pages, it->ooi_total_npages * sizeof(*pages)); + OBD_FREE_PTR_ARRAY(pages, it->ooi_total_npages); it->ooi_pos_page = 0; it->ooi_total_npages = 0; @@ -1932,6 +2034,13 @@ again: it->ooi_pos_ent++; if (it->ooi_pos_ent < idxpage->lip_nr) { if (it->ooi_rec_size == + sizeof(struct lu_orphan_rec_v3)) { + it->ooi_ent = + (struct lu_orphan_ent_v3 *)idxpage->lip_entries+ + it->ooi_pos_ent; + if (it->ooi_swab) + lustre_swab_orphan_ent_v3(it->ooi_ent); + } else if (it->ooi_rec_size == sizeof(struct lu_orphan_rec_v2)) { it->ooi_ent = (struct lu_orphan_ent_v2 *)idxpage->lip_entries+ @@ -1990,7 +2099,13 @@ static int osp_orphan_it_rec(const struct lu_env *env, const struct dt_it *di, struct osp_it *it = (struct osp_it *)di; if (likely(it->ooi_ent)) { - if (it->ooi_rec_size == sizeof(struct lu_orphan_rec_v2)) { + if (it->ooi_rec_size == sizeof(struct lu_orphan_rec_v3)) { + struct lu_orphan_ent_v3 *ent = + (struct lu_orphan_ent_v3 *)it->ooi_ent; + + *(struct lu_orphan_rec_v3 *)rec = ent->loe_rec; + } else if (it->ooi_rec_size == + sizeof(struct lu_orphan_rec_v2)) { struct lu_orphan_ent_v2 *ent = (struct lu_orphan_ent_v2 *)it->ooi_ent; @@ -2107,10 +2222,10 @@ static struct dt_object_operations osp_obj_ops = { .do_xattr_get = osp_xattr_get, .do_declare_xattr_set = osp_declare_xattr_set, .do_xattr_set = osp_xattr_set, - .do_declare_create = osp_declare_object_create, - .do_create = osp_object_create, - .do_declare_destroy = osp_declare_object_destroy, - .do_destroy = osp_object_destroy, + .do_declare_create = osp_declare_create, + .do_create = osp_create, + .do_declare_destroy = osp_declare_destroy, + .do_destroy = osp_destroy, .do_index_try = osp_index_try, }; @@ -2140,6 +2255,7 @@ static int osp_object_init(const struct lu_env *env, struct lu_object *o, o->lo_header->loh_attr |= LOHA_REMOTE; INIT_LIST_HEAD(&po->opo_xattr_list); INIT_LIST_HEAD(&po->opo_invalidate_cb_list); + init_rwsem(&po->opo_invalidate_sem); if (is_ost_obj(o)) { po->opo_obj.do_ops = &osp_obj_ops; @@ -2167,6 +2283,14 @@ static int osp_object_init(const struct lu_env *env, struct lu_object *o, RETURN(rc); } +static void osp_object_free_rcu(struct rcu_head *head) +{ + struct osp_object *obj = container_of(head, struct osp_object, + opo_header.loh_rcu); + + kmem_cache_free(osp_object_kmem, obj); +} + /** * Implement OSP layer lu_object_operations::loo_object_free() interface. * @@ -2195,9 +2319,10 @@ static void osp_object_free(const struct lu_env *env, struct lu_object *o) "Still has %d users on the xattr entry %.*s\n", count-1, (int)oxe->oxe_namelen, oxe->oxe_buf); - OBD_FREE(oxe, oxe->oxe_buflen); + OBD_FREE_LARGE(oxe, oxe->oxe_buflen); } - OBD_SLAB_FREE_PTR(obj, osp_object_kmem); + OBD_FREE_PRE(obj, sizeof(*obj), "slab-freed"); + call_rcu(&obj->opo_header.loh_rcu, osp_object_free_rcu); } /** @@ -2229,6 +2354,14 @@ static void osp_object_release(const struct lu_env *env, struct lu_object *o) d->opd_pre_reserved--; spin_unlock(&d->opd_pre_lock); + /* + * Check that osp_precreate_cleanup_orphans is not blocked + * due to opd_pre_reserved > 0. + */ + if (unlikely(d->opd_pre_reserved == 0 && + (d->opd_pre_recovering || d->opd_pre_status))) + wake_up(&d->opd_pre_waitq); + /* not needed in cache any more */ set_bit(LU_OBJECT_HEARD_BANSHEE, &o->lo_header->loh_flags); }