spinlock_t oscc_lock;
struct obd_device *oscc_obd;
int oscc_flags;
+ obd_id oscc_next_id;
+ wait_queue_head_t oscc_waitq;
};
struct ldlm_export_data {
/* Don't conflict with on-wire flags OBD_BRW_WRITE, etc */
#define N_LOCAL_TEMP_PAGE 0x10000000
-typedef int (*obd_obj_alloc_func_t)(obd_id *objid);
-
struct obd_trans_info {
__u64 oti_transno;
__u64 *oti_objid;
struct llog_cookie *oti_logcookies;
int oti_numcookies;
int oti_flags;
- obd_obj_alloc_func_t oti_obj_alloc;
};
static inline void oti_alloc_cookies(struct obd_trans_info *oti,int num_cookies)
struct lov_request *req =
list_entry(pos, struct lov_request, rq_link);
- obd_id *objids = oti->oti_objid;
-
- if (oti->oti_obj_alloc) {
- __u64 next_id;
-
- /*
- * allocating new objid. Here it is delegated to caller,
- * that is MDS in CROW case.
- */
- next_id = oti->oti_obj_alloc(&objids[req->rq_idx]);
- req->rq_oa->o_id = next_id;
- } else {
- /* and here is default "allocator" */
- req->rq_oa->o_id = ++objids[req->rq_idx];
- }
+ /* XXX: LOV STACKING: use real "obj_mdp" sub-data */
+ rc = obd_create(lov->tgts[req->rq_idx].ltd_exp,
+ req->rq_oa, NULL, 0, &req->rq_md, oti);
lov_update_create_set(set, req, rc);
}
rc = lov_fini_create_set(set, ea);
"writing objids file: %d\n", rc);
}
}
+
/*
* I want to see a callback happen when the OBD moves to a "For General
* Use" state, and that's when we'll call set_nextid(). The class driver
if (rc)
GOTO(cleanup, rc);
+ /* we don't set next id manually, instead OSCs will set them
+ * during own recovery from DELORPHAN reply -bzzz */
+#if 0
vals[0] = index;
rc = mds_dt_set_info(obd->obd_self_export, strlen("next_id"),
"next_id", 2, vals);
if (rc)
GOTO(cleanup, rc);
+#endif
obd_llog_finish(obd, &obd->obd_llogs, old_count);
obd_llog_cat_initialize(obd, &obd->obd_llogs, count, name);
return ERR_PTR(rc);
}
-/* this is object id allocation callback */
-static int mds_obj_alloc(obd_id *objid)
-{
- ENTRY;
- LASSERT(objid != NULL);
- RETURN(++(*objid));
-}
-
static inline void
mds_objids_from_lmm(obd_id *ids, struct lov_mds_md *lmm,
struct lov_desc *desc)
/* must be called with i_sem held */
int
-mds_create_object(struct obd_device *obd, struct ptlrpc_request *req,
- int offset, struct mds_update_record *rec,
- struct dentry *dchild, void **handle,
- obd_id *ids)
+mds_create_objects(struct obd_device *obd, struct ptlrpc_request *req,
+ int offset, struct mds_update_record *rec,
+ struct dentry *dchild, void **handle,
+ obd_id **ids)
{
struct inode *inode = dchild->d_inode;
struct mds_obd *mds = &obd->u.mds;
if (body->valid & OBD_MD_FLEASIZE)
RETURN(0);
- oti.oti_objid = ids;
+ OBD_ALLOC(*ids, mds->mds_dt_desc.ld_tgt_count * sizeof(**ids));
+ if (*ids == NULL)
+ RETURN(-ENOMEM);
+ oti.oti_objid = *ids;
/* replay case */
if (lustre_msg_get_flags(req->rq_reqmsg) & MSG_REPLAY) {
RETURN(rc);
}
- /*
- * FIXME: this is evil layering violation, all things related to
- * stripping should be done by LOV. --umka.
- */
- mds_objids_from_lmm(ids, lmm, &mds->mds_dt_desc);
+ mds_objids_from_lmm(*ids, lmm, &mds->mds_dt_desc);
lmm_buf = lustre_msg_buf(req->rq_repmsg, offset, 0);
lmm_bufsize = req->rq_repmsg->buflens[offset];
-
LASSERT(lmm_buf != NULL);
LASSERT(lmm_bufsize >= lmm_size);
GOTO(out_oa, rc);
}
- /*
- * create with CROW flag and base ids for allocating new ids on
- * them.
- */
- oti.oti_flags |= OBD_MODE_CROW;
- oti.oti_obj_alloc = mds_obj_alloc;
-
LASSERT(oa->o_gr >= FILTER_GROUP_FIRST_MDS);
+ oti.oti_flags |= OBD_MODE_CROW;
rc = obd_create(mds->mds_dt_exp, oa, NULL, 0, &lsm, &oti);
if (rc) {
}
if (rec != NULL) {
- /* no EA: create objects */
if ((body->valid & OBD_MD_FLEASIZE) &&
(rec->ur_flags & MDS_OPEN_HAS_EA)) {
up(&dchild->d_inode->i_sem);
}
if (!(body->valid & OBD_MD_FLEASIZE)) {
- int ids_size = mds->mds_dt_desc.ld_tgt_count * sizeof(*ids);
-
- OBD_ALLOC(ids, ids_size);
- if (ids == NULL) {
- up(&dchild->d_inode->i_sem);
- RETURN(-ENOMEM);
- }
-
- /*
- * synchronizing object creating to prevent another
- * threads take the same base objid values.
- */
- down(&mds->mds_create_sem);
-
- /* preparing base ids */
- mds_dt_save_objids(obd, ids);
-
- /*
- * create objects, @ids will contain new allocated obj
- * ids.
- */
- rc = mds_create_object(obd, req, 2, rec,
- dchild, handle, ids);
+ /* no EA: create objects */
+ rc = mds_create_objects(obd, req, 2, rec,
+ dchild, handle, &ids);
if (rc) {
CERROR("mds_create_object: rc = %d\n", rc);
- up(&mds->mds_create_sem);
up(&dchild->d_inode->i_sem);
- OBD_FREE(ids, ids_size);
RETURN(rc);
}
-
- /*
- * update MDS objids by new ones allocated in
- * mds_create_object().
- */
- mds_dt_update_objids(obd, ids);
- OBD_FREE(ids, ids_size);
-
- up(&mds->mds_create_sem);
}
if (S_ISREG(dchild->d_inode->i_mode) &&
CDEBUG(D_INODE, "mfd %p, cookie "LPX64"\n", mfd,
mfd->mfd_handle.h_cookie);
+
+ if (ids != NULL) {
+ mds_dt_update_objids(obd, ids);
+ OBD_FREE(ids, sizeof(*ids) * mds->mds_dt_desc.ld_tgt_count);
+ }
+
RETURN(rc);
}
CDEBUG(D_HA, "%s:["LPU64"] after destroy: set last_objids = "
LPU64"\n", exp->exp_obd->obd_name, doa->o_gr, oa->o_id);
+ /* return next free id to be used as a new start of sequence -bzzz */
+ oa->o_id = last + 1;
+
filter_set_last_id(filter, oa->o_gr, oa->o_id);
clear_bit(doa->o_gr, &filter->fo_destroys_in_progress);
up(&filter->fo_create_locks[oa->o_gr]);
#include <linux/obd_class.h>
#include "osc_internal.h"
+int oscc_recovering(struct osc_creator *oscc)
+{
+ int recov = 0;
+
+ spin_lock(&oscc->oscc_lock);
+ recov = oscc->oscc_flags & OSCC_FLAG_RECOVERING;
+ spin_unlock(&oscc->oscc_lock);
+
+ return recov;
+}
+
/* this only is used now for deleting orphanes */
int osc_create(struct obd_export *exp, struct obdo *oa,
void *acl, int acl_size, struct lov_stripe_md **ea,
struct obd_trans_info *oti)
{
struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
- int rc = 0;
+ int rc = 0, try_again = 1;
ENTRY;
LASSERT(oa);
RETURN(rc);
}
- LBUG();
- RETURN(0);
+ while (try_again) {
+ /* If orphans are being recovered, then we must wait until
+ it is finished before we can continue with create. */
+ if (oscc_recovering(oscc)) {
+ struct l_wait_info lwi;
+
+ CDEBUG(D_HA,"%p: oscc recovery in progress, waiting\n",
+ oscc);
+
+ lwi = LWI_TIMEOUT(MAX(obd_timeout*HZ/4, 1), NULL, NULL);
+ rc = l_wait_event(oscc->oscc_waitq,
+ !oscc_recovering(oscc), &lwi);
+ LASSERT(rc == 0 || rc == -ETIMEDOUT);
+ if (rc == -ETIMEDOUT) {
+ CDEBUG(D_HA,"%p: timeout waiting on recovery\n",
+ oscc);
+ RETURN(rc);
+ }
+ CDEBUG(D_HA, "%p: oscc recovery over, waking up\n",
+ oscc);
+ }
+
+ spin_lock(&oscc->oscc_lock);
+ if (oscc->oscc_flags & OSCC_FLAG_EXITING) {
+ spin_unlock(&oscc->oscc_lock);
+ break;
+ }
+
+ if (oscc->oscc_flags & OSCC_FLAG_NOSPC) {
+ rc = -ENOSPC;
+ spin_unlock(&oscc->oscc_lock);
+ break;
+ }
+
+ oscc->oscc_next_id++;
+ oa->o_id = oscc->oscc_next_id;
+ try_again = 0;
+ spin_unlock(&oscc->oscc_lock);
+
+ CDEBUG(D_HA, "%s: returning objid "LPU64"\n",
+ oscc->oscc_obd->u.cli.cl_import->imp_target_uuid.uuid,
+ oa->o_id);
+ }
+
+ RETURN(rc);
}
void oscc_init(struct obd_device *obd)
oscc->oscc_obd = obd;
spin_lock_init(&oscc->oscc_lock);
oscc->oscc_flags |= OSCC_FLAG_RECOVERING;
+ init_waitqueue_head(&oscc->oscc_waitq);
}
int osc_real_create(struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md **ea, struct obd_trans_info *oti)
{
+ struct osc_creator *oscc = &exp->exp_obd->u.cli.cl_oscc;
struct ptlrpc_request *request;
struct ost_body *body;
struct lov_stripe_md *lsm;
GOTO (out_req, rc = -EPROTO);
}
+ if ((oa->o_valid & OBD_MD_FLFLAGS) && oa->o_flags == OBD_FL_DELORPHAN) {
+ struct obd_import *imp = class_exp2cliimp(exp);
+ /* MDS declares last known object, OSS responses
+ * with next possible object -bzzz */
+ spin_lock(&oscc->oscc_lock);
+ oscc->oscc_next_id = body->oa.o_id;
+ spin_unlock(&oscc->oscc_lock);
+ CDEBUG(D_HA, "%s: set nextid "LPD64" after recovery\n",
+ imp->imp_target_uuid.uuid, oa->o_id);
+ }
memcpy(oa, &body->oa, sizeof(*oa));
/* This should really be sent by the OST */