spin_lock_init(&cli->cl_read_page_hist.oh_lock);
spin_lock_init(&cli->cl_write_page_hist.oh_lock);
+ cli->cl_dstr_in_flight = 0;
+ cli->cl_max_dstr_in_flight = OST_MAX_THREADS;
+ init_waitqueue_head(&cli->cl_wait_for_destroy_slot);
+
memset(&cli->cl_last_write_time, 0,
sizeof(cli->cl_last_write_time));
int rc = 0, cleanup_phase = 0;
int unlink_by_id = 0;
int update_mode;
+ int destroy_objects = 0;
ENTRY;
LASSERT(offset == 1 || offset == 3);
&lcl) > 0){
body->valid |= OBD_MD_FLCOOKIE;
}
-
- rc = mds_destroy_object(obd, child_inode, 1);
- if (rc) {
- CERROR("can't remove OST object, err %d\n",
- rc);
- }
+
+ /* destroy OST objects when diskfs transaction is closed */
+ destroy_objects = 1;
if (child_inode->i_nlink == 0)
mds_fidmap_del(obd, &body->id1);
case 5: /* pending_dir semaphore */
up(&mds->mds_pending_dir->d_inode->i_sem);
case 4: /* child inode semaphore */
+ if (destroy_objects) {
+ rc = mds_destroy_object(obd, child_inode, 1);
+ if (rc)
+ CERROR("can't remove OST object, err %d\n", rc);
+ }
UP_READ_I_ALLOC_SEM(child_inode);
/* handle splitted dir */
if (rc == 0) {
return rc;
}
+static int osc_destroy_interpret(struct ptlrpc_request *req,
+ void *arg, int rc)
+{
+ struct obd_import *imp = req->rq_import;
+ struct client_obd *cli = &imp->imp_obd->u.cli;
+ ENTRY;
+ spin_lock(&cli->cl_loi_list_lock);
+ cli->cl_dstr_in_flight--;
+ LASSERT(cli->cl_dstr_in_flight >= 0);
+ LASSERT(cli->cl_dstr_in_flight <= cli->cl_dstr_in_flight);
+ spin_unlock(&cli->cl_loi_list_lock);
+ wake_up(&cli->cl_wait_for_destroy_slot);
+ RETURN(rc);
+}
+
+static void osc_grab_destroy_slot(struct client_obd *cli)
+{
+ spin_lock(&cli->cl_loi_list_lock);
+ while (1) {
+ struct l_wait_info lwi = { 0 };
+
+ if (cli->cl_dstr_in_flight < cli->cl_max_dstr_in_flight) {
+ cli->cl_dstr_in_flight++;
+ break;
+ }
+
+ spin_unlock(&cli->cl_loi_list_lock);
+ l_wait_event(cli->cl_wait_for_destroy_slot,
+ cli->cl_dstr_in_flight < cli->cl_max_dstr_in_flight,
+ &lwi);
+ spin_lock(&cli->cl_loi_list_lock);
+ }
+ spin_unlock(&cli->cl_loi_list_lock);
+}
+
static int osc_destroy(struct obd_export *exp, struct obdo *oa,
struct lov_stripe_md *ea, struct obd_trans_info *oti)
{
+ struct obd_import *imp = class_exp2cliimp(exp);
+ struct client_obd *cli = &imp->imp_obd->u.cli;
struct ptlrpc_request *request;
struct ost_body *body;
int rc, size = sizeof(*body);
request->rq_replen = lustre_msg_size(1, &size);
if (oti != NULL && (oti->oti_flags & OBD_MODE_ASYNC)) {
+ osc_grab_destroy_slot(cli);
+ request->rq_interpret_reply = osc_destroy_interpret;
ptlrpcd_add_req(request);
rc = 0;
} else {