RETURN(rc);
}
+int noinline mds_get_open_lock(struct obd_device *obd, struct dentry *dchild,
+ int child_mode, struct lustre_handle *child_lockh,
+ struct ldlm_reply *rep)
+{
+ ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN } };
+ struct ldlm_res_id child_res_id;
+ int rc = 0, lock_flags = 0;
+
+ /* In case of replay we do not get a lock assuming that the
+ caller has it already */
+ memset(&child_res_id, 0, sizeof(child_res_id));
+ child_res_id.name[0] = dchild->d_inode->i_ino;
+ child_res_id.name[1] = dchild->d_inode->i_generation;
+
+ rc = ldlm_cli_enqueue_local(obd->obd_namespace, &child_res_id,
+ LDLM_IBITS, &policy, child_mode,
+ &lock_flags, ldlm_blocking_ast,
+ ldlm_completion_ast, NULL, NULL,
+ 0, NULL, child_lockh);
+ if (rc != ELDLM_OK)
+ goto out;
+
+ /* Let mds_intent_policy know that we have a lock to return */
+ ldlm_reply_set_disposition(rep, DISP_OPEN_LOCK);
+
+out:
+ return rc;
+}
+
int mds_open(struct mds_update_record *rec, int offset,
struct ptlrpc_request *req, struct lustre_handle *child_lockh)
{
int child_mode = LCK_CR;
/* Always returning LOOKUP lock if open succesful to guard
dentry on client. */
- int lock_flags = 0;
int quota_pending[2] = {0, 0};
int use_parent, need_open_lock;
unsigned int gid = current_fsgid();
}
if (need_open_lock) {
- ldlm_policy_data_t policy = { .l_inodebits = { MDS_INODELOCK_LOOKUP | MDS_INODELOCK_OPEN } };
- struct ldlm_res_id child_res_id;
-
- /* In case of replay we do not get a lock assuming that the
- caller has it already */
- memset(&child_res_id, 0, sizeof(child_res_id));
- child_res_id.name[0] = dchild->d_inode->i_ino;
- child_res_id.name[1] = dchild->d_inode->i_generation;
-
- rc = ldlm_cli_enqueue_local(obd->obd_namespace, &child_res_id,
- LDLM_IBITS, &policy, child_mode,
- &lock_flags, ldlm_blocking_ast,
- ldlm_completion_ast, NULL, NULL,
- 0, NULL, child_lockh);
+ rc = mds_get_open_lock(obd, dchild, child_mode, child_lockh, rep);
if (rc != ELDLM_OK)
GOTO(cleanup, rc);
-
- /* Let mds_intent_policy know that we have a lock to return */
- ldlm_reply_set_disposition(rep, DISP_OPEN_LOCK);
}
if (!S_ISREG(dchild->d_inode->i_mode) &&
return (-ETIMEDOUT);
}
+static void noinline ptlrpc_wait_event(struct ptlrpc_service *svc,
+ struct ptlrpc_thread *thread)
+{
+ /* Don't exit while there are replies to be handled */
+ struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
+ ptlrpc_retry_rqbds, svc);
+
+ lc_watchdog_disable(thread->t_watchdog);
+
+ cfs_cond_resched();
+
+ l_wait_event_exclusive (svc->srv_waitq,
+ ((thread->t_flags & SVC_STOPPING) != 0 &&
+ svc->srv_n_difficult_replies == 0) ||
+ (!list_empty(&svc->srv_idle_rqbds) &&
+ svc->srv_rqbd_timeout == 0) ||
+ !list_empty(&svc->srv_req_in_queue) ||
+ !list_empty(&svc->srv_reply_queue) ||
+ (ptlrpc_server_request_pending(svc, 0) &&
+ (svc->srv_n_active_reqs <
+ (svc->srv_threads_running - 1))) ||
+ svc->srv_at_check,
+ &lwi);
+
+ lc_watchdog_touch(thread->t_watchdog, GET_TIMEOUT(svc));
+}
+
static int ptlrpc_main(void *arg)
{
struct ptlrpc_svc_data *data = (struct ptlrpc_svc_data *)arg;
while ((thread->t_flags & SVC_STOPPING) == 0 ||
svc->srv_n_difficult_replies != 0) {
- /* Don't exit while there are replies to be handled */
- struct l_wait_info lwi = LWI_TIMEOUT(svc->srv_rqbd_timeout,
- ptlrpc_retry_rqbds, svc);
-
- lc_watchdog_disable(thread->t_watchdog);
-
- cfs_cond_resched();
-
- l_wait_event_exclusive (svc->srv_waitq,
- ((thread->t_flags & SVC_STOPPING) != 0 &&
- svc->srv_n_difficult_replies == 0) ||
- (!list_empty(&svc->srv_idle_rqbds) &&
- svc->srv_rqbd_timeout == 0) ||
- !list_empty(&svc->srv_req_in_queue) ||
- !list_empty(&svc->srv_reply_queue) ||
- (ptlrpc_server_request_pending(svc, 0) &&
- (svc->srv_n_active_reqs <
- (svc->srv_threads_running - 1))) ||
- svc->srv_at_check,
- &lwi);
-
- lc_watchdog_touch(thread->t_watchdog, GET_TIMEOUT(svc));
+ ptlrpc_wait_event(svc, thread);
ptlrpc_check_rqbd_pool(svc);