__u8 *uuid_ptr;
char *str, *label;
char ns_name[48];
- int rc;
+ int rc, i;
ENTRY;
if (lcfg->lcfg_bufcount < 3 ||
if (rc)
GOTO(err_ops, rc);
- filter->fo_destroy_in_progress = 0;
- sema_init(&filter->fo_create_lock, 1);
+ filter->fo_destroys_in_progress = 0;
+ for (i = 0; i < 32; i++)
+ sema_init(&filter->fo_create_locks[i], 1);
+
spin_lock_init(&filter->fo_translock);
spin_lock_init(&filter->fo_objidlock);
INIT_LIST_HEAD(&filter->fo_export_list);
RETURN(lsm_size);
}
-/* caller must hold fo_create_lock */
+/* caller must hold fo_create_locks[oa->o_gr] */
static int filter_destroy_precreated(struct obd_export *exp, struct obdo *oa,
struct filter_obd *filter)
{
ENTRY;
LASSERT(oa);
- LASSERT(down_trylock(&filter->fo_create_lock) != 0);
-
- memset(&doa, 0, sizeof(doa));
-
LASSERT(oa->o_gr != 0);
LASSERT(oa->o_valid & OBD_MD_FLGROUP);
+ LASSERT(down_trylock(&filter->fo_create_locks[oa->o_gr]) != 0);
+
+ memset(&doa, 0, sizeof(doa));
doa.o_valid |= OBD_MD_FLGROUP;
doa.o_gr = oa->o_gr;
doa.o_mode = S_IFREG;
- if (!filter->fo_destroy_in_progress) {
- CERROR("%s: destroy_in_progress already cleared\n",
- exp->exp_obd->obd_name);
+ if (!test_bit(doa.o_gr, &filter->fo_destroys_in_progress)) {
+ CERROR("%s:["LPU64"] destroys_in_progress already cleared\n",
+ exp->exp_obd->obd_name, doa.o_gr);
RETURN(0);
}
last = filter_last_id(filter, doa.o_gr);
+
CWARN("%s: deleting orphan objects from "LPU64" to "LPU64"\n",
exp->exp_obd->obd_name, oa->o_id + 1, last);
+
for (id = last; id > oa->o_id; id--) {
doa.o_id = id;
rc = filter_destroy(exp, &doa, NULL, NULL, NULL);
exp->exp_obd->obd_name, doa.o_gr, oa->o_id);
rc = filter_update_last_objid(exp->exp_obd, doa.o_gr, 1);
- filter->fo_destroy_in_progress = 0;
-
+ clear_bit(doa.o_gr, &filter->fo_destroys_in_progress);
RETURN(0);
}
/* delete orphans request */
if ((oa->o_valid & OBD_MD_FLFLAGS) && (oa->o_flags & OBD_FL_DELORPHAN)){
/* This causes inflight precreates to abort and drop lock */
- filter->fo_destroy_in_progress = 1;
- down(&filter->fo_create_lock);
+ set_bit(group, &filter->fo_destroys_in_progress);
+ down(&filter->fo_create_locks[group]);
+ if (!test_bit(group, &filter->fo_destroys_in_progress)) {
+ CERROR("%s:["LPU64"] destroys_in_progress already cleared\n",
+ exp->exp_obd->obd_name, group);
+ up(&filter->fo_create_locks[group]);
+ RETURN(0);
+ }
diff = oa->o_id - filter_last_id(filter, group);
CDEBUG(D_HA, "filter_last_id() = "LPU64" -> diff = %d\n",
filter_last_id(filter, group), diff);
"orphans were deleted\n", obd->obd_name);
GOTO(out, rc);
} else {
- /*XXX used by MDS for the first time! */
- filter->fo_destroy_in_progress = 0;
+ /* XXX: Used by MDS for the first time! */
+ clear_bit(group, &filter->fo_destroys_in_progress);
}
} else {
- down(&filter->fo_create_lock);
+ down(&filter->fo_create_locks[group]);
if (oti->oti_conn_cnt < exp->exp_conn_cnt) {
CERROR("%s: dropping old precreate request\n",
obd->obd_name);
/* else diff == 0 */
GOTO(out, rc = 0);
out:
- up(&filter->fo_create_lock);
+ up(&filter->fo_create_locks[group]);
return rc;
}
* thread 2: creates object (x + 1)
* thread 1: tries to create object x, gets -ENOSPC
*
- * Caller must hold fo_create_lock
+ * Caller must hold fo_create_locks[group]
*/
static int filter_precreate(struct obd_device *obd, struct obdo *oa,
obd_gr group, int *num)
filter = &obd->u.filter;
- LASSERT(down_trylock(&filter->fo_create_lock) != 0);
+ LASSERT(down_trylock(&filter->fo_create_locks[group]) != 0);
if ((oa->o_valid & OBD_MD_FLFLAGS) &&
(oa->o_flags & OBD_FL_RECREATE_OBJS)) {
for (i = 0; i < *num && err == 0; i++) {
int cleanup_phase = 0;
- if (filter->fo_destroy_in_progress) {
- CWARN("%s: precreate aborted by destroy\n",
+ if (test_bit(group, &filter->fo_destroys_in_progress)) {
+ CWARN("%s: create aborted by destroy\n",
obd->obd_name);
- rc = -EAGAIN;
- break;
+ GOTO(cleanup, rc = -EALREADY);
}
-
+
if (recreate_obj) {
__u64 last_id;
next_id = oa->o_id;
rc = -EINVAL;
} else {
diff = 1;
- down(&filter->fo_create_lock);
+ down(&filter->fo_create_locks[oa->o_gr]);
rc = filter_precreate(obd, oa, oa->o_gr, &diff);
- up(&filter->fo_create_lock);
+ up(&filter->fo_create_locks[oa->o_gr]);
}
} else {
rc = filter_handle_precreate(exp, oa, oa->o_gr, oti);