* @{
*/
+/* echo thread key have a CL_THREAD flag, which set cl_env function directly */
+#define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
+#define ECHO_DT_CTX_TAG (LCT_REMEMBER | LCT_DT_THREAD)
+#define ECHO_SES_TAG (LCT_REMEMBER | LCT_SESSION | LCT_SERVER_SESSION)
+
struct echo_device {
struct cl_device ed_cl;
struct echo_client_obd *ed_ec;
struct echo_page {
struct cl_page_slice ep_cl;
- struct mutex ep_lock;
+ unsigned long ep_lock;
};
struct echo_lock {
{
struct echo_page *ep = cl2echo_page(slice);
- if (!nonblock)
- mutex_lock(&ep->ep_lock);
- else if (!mutex_trylock(&ep->ep_lock))
- return -EAGAIN;
+ if (!nonblock) {
+ if (test_and_set_bit(0, &ep->ep_lock))
+ return -EAGAIN;
+ } else {
+ while (test_and_set_bit(0, &ep->ep_lock))
+ wait_on_bit(&ep->ep_lock, 0, TASK_UNINTERRUPTIBLE);
+ }
return 0;
}
{
struct echo_page *ep = cl2echo_page(slice);
- LASSERT(mutex_is_locked(&ep->ep_lock));
- mutex_unlock(&ep->ep_lock);
+ LASSERT(test_bit(0, &ep->ep_lock));
+ clear_and_wake_up_bit(0, &ep->ep_lock);
}
static void echo_page_discard(const struct lu_env *env,
static int echo_page_is_vmlocked(const struct lu_env *env,
const struct cl_page_slice *slice)
{
- if (mutex_is_locked(&cl2echo_page(slice)->ep_lock))
+ if (test_bit(0, &cl2echo_page(slice)->ep_lock))
return -EBUSY;
return -ENODATA;
}
struct echo_page *ep = cl2echo_page(slice);
(*printer)(env, cookie, LUSTRE_ECHO_CLIENT_NAME"-page@%p %d vm@%p\n",
- ep, mutex_is_locked(&ep->ep_lock),
+ ep, test_bit(0, &ep->ep_lock),
slice->cpl_page->cp_vmpage);
return 0;
}
ENTRY;
get_page(page->cp_vmpage);
- mutex_init(&ep->ep_lock);
+ /*
+ * ep_lock is similar to the lock_page() lock, and
+ * cannot usefully be monitored by lockdep.
+ * So just use a bit in an "unsigned long" and use the
+ * wait_on_bit() interface to wait for the bit to be clear.
+ */
+ ep->ep_lock = 0;
cl_page_slice_add(page, &ep->ep_cl, obj, index, &echo_page_ops);
atomic_inc(&eco->eo_npages);
RETURN(0);
RETURN(0);
}
-static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
+static void echo_object_delete(const struct lu_env *env, struct lu_object *obj)
{
struct echo_object *eco = cl2echo_obj(lu2cl(obj));
- struct echo_client_obd *ec = eco->eo_dev->ed_ec;
+ struct echo_client_obd *ec;
ENTRY;
+
+ /* object delete called unconditolally - layer init or not */
+ if (eco->eo_dev == NULL)
+ return;
+
+ ec = eco->eo_dev->ed_ec;
+
LASSERT(atomic_read(&eco->eo_npages) == 0);
spin_lock(&ec->ec_lock);
list_del_init(&eco->eo_obj_chain);
spin_unlock(&ec->ec_lock);
- lu_object_fini(obj);
- lu_object_header_fini(obj->lo_header);
-
if (eco->eo_oinfo)
OBD_FREE_PTR(eco->eo_oinfo);
+}
+
+static void echo_object_free(const struct lu_env *env, struct lu_object *obj)
+{
+ struct echo_object *eco = cl2echo_obj(lu2cl(obj));
+
+ ENTRY;
+
+ lu_object_fini(obj);
+ lu_object_header_fini(obj->lo_header);
OBD_SLAB_FREE_PTR(eco, echo_object_kmem);
EXIT;
static const struct lu_object_operations echo_lu_obj_ops = {
.loo_object_init = echo_object_init,
- .loo_object_delete = NULL,
+ .loo_object_delete = echo_object_delete,
.loo_object_release = NULL,
.loo_object_free = echo_object_free,
.loo_object_print = echo_object_print,
CERROR("Cleanup obd device %s error(%d)\n",
obd->obd_name, rc2);
}
+ /* fallthrough */
case 3:
echo_site_fini(env, ed);
+ /* fallthrough */
case 2:
cl_device_fini(&ed->ed_cl);
+ /* fallthrough */
case 1:
OBD_FREE_PTR(ed);
+ /* fallthrough */
case 0:
default:
break;
}
static void echo_commit_callback(const struct lu_env *env, struct cl_io *io,
- struct cl_page *page)
+ struct pagevec *pvec)
{
struct echo_thread_info *info;
struct cl_2queue *queue;
+ int i = 0;
info = echo_env_info(env);
LASSERT(io == &info->eti_io);
queue = &info->eti_queue;
- cl_page_list_add(&queue->c2_qout, page);
+
+ for (i = 0; i < pagevec_count(pvec); i++) {
+ struct page *vmpage = pvec->pages[i];
+ struct cl_page *page = (struct cl_page *)vmpage->private;
+
+ cl_page_list_add(&queue->c2_qout, page);
+ }
}
static int cl_echo_object_brw(struct echo_object *eco, int rw, u64 offset,
}
}
-#ifdef CONFIG_FS_POSIX_ACL
+#ifdef CONFIG_LUSTRE_FS_POSIX_ACL
if ((ma->ma_need & MA_ACL_DEF) && S_ISDIR(mode)) {
buf->lb_buf = ma->ma_acl;
buf->lb_len = ma->ma_acl_size;
ucred->uc_valid = UCRED_INIT;
}
-#define ECHO_MD_CTX_TAG (LCT_REMEMBER | LCT_MD_THREAD)
-#define ECHO_MD_SES_TAG (LCT_REMEMBER | LCT_SESSION | LCT_SERVER_SESSION)
static int echo_md_handler(struct echo_device *ed, int command,
char *path, int path_len, __u64 id, int count,
struct obd_ioctl_data *data)
if (IS_ERR(env))
RETURN(PTR_ERR(env));
- rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_MD_SES_TAG);
+ rc = lu_env_refill_by_tags(env, ECHO_MD_CTX_TAG, ECHO_SES_TAG);
if (rc != 0)
GOTO(out_env, rc);
struct echo_object *eco;
struct obd_ioctl_data *data = karg;
struct lu_env *env;
+ unsigned long env_tags = 0;
+ __u16 refcheck;
struct obdo *oa;
struct lu_fid fid;
int rw = OBD_BRW_READ;
int rc = 0;
-#ifdef HAVE_SERVER_SUPPORT
- struct lu_context echo_session;
-#endif
ENTRY;
oa = &data->ioc_obdo1;
if (rc < 0)
RETURN(rc);
- OBD_ALLOC_PTR(env);
- if (!env)
- RETURN(-ENOMEM);
+ env = cl_env_get(&refcheck);
+ if (IS_ERR(env))
+ RETURN(PTR_ERR(env));
- rc = lu_env_init(env, LCT_DT_THREAD);
- if (rc)
- GOTO(out_alloc, rc = -ENOMEM);
lu_env_add(env);
- if (rc)
- GOTO(out_env_fini, rc = -ENOMEM);
#ifdef HAVE_SERVER_SUPPORT
- env->le_ses = &echo_session;
- rc = lu_context_init(env->le_ses, LCT_SERVER_SESSION | LCT_NOREF);
- if (unlikely(rc < 0))
- GOTO(out_env, rc);
- lu_context_enter(env->le_ses);
+ if (cmd == OBD_IOC_ECHO_MD || cmd == OBD_IOC_ECHO_ALLOC_SEQ)
+ env_tags = ECHO_MD_CTX_TAG;
+ else
+#endif
+ env_tags = ECHO_DT_CTX_TAG;
+
+ rc = lu_env_refill_by_tags(env, env_tags, ECHO_SES_TAG);
+ if (rc != 0)
+ GOTO(out, rc);
+#ifdef HAVE_SERVER_SUPPORT
tsi = tgt_ses_info(env);
- tsi->tsi_exp = ec->ec_exp;
+ /* treat as local operation */
+ tsi->tsi_exp = NULL;
tsi->tsi_jobid = NULL;
#endif
+
switch (cmd) {
case OBD_IOC_CREATE: /* may create echo object */
if (!cfs_capable(CFS_CAP_SYS_ADMIN))
GOTO(out, rc);
}
case OBD_IOC_ECHO_ALLOC_SEQ: {
- struct lu_env *cl_env;
- __u16 refcheck;
__u64 seq;
int max_count;
if (!cfs_capable(CFS_CAP_SYS_ADMIN))
GOTO(out, rc = -EPERM);
- cl_env = cl_env_get(&refcheck);
- if (IS_ERR(cl_env))
- GOTO(out, rc = PTR_ERR(cl_env));
-
- rc = lu_env_refill_by_tags(cl_env, ECHO_MD_CTX_TAG,
- ECHO_MD_SES_TAG);
- if (rc != 0) {
- cl_env_put(cl_env, &refcheck);
- GOTO(out, rc);
- }
-
- rc = seq_client_get_seq(cl_env, ed->ed_cl_seq, &seq);
- cl_env_put(cl_env, &refcheck);
+ rc = seq_client_get_seq(env, ed->ed_cl_seq, &seq);
if (rc < 0) {
CERROR("%s: Can not alloc seq: rc = %d\n",
obd->obd_name, rc);
EXIT;
out:
-#ifdef HAVE_SERVER_SUPPORT
- lu_context_exit(env->le_ses);
- lu_context_fini(env->le_ses);
-out_env:
-#endif
lu_env_remove(env);
-out_env_fini:
- lu_env_fini(env);
-out_alloc:
- OBD_FREE_PTR(env);
+ cl_env_put(env, &refcheck);
return rc;
}
INIT_LIST_HEAD(&ec->ec_locks);
ec->ec_unique = 0;
+ lu_context_tags_update(ECHO_DT_CTX_TAG);
+ lu_session_tags_update(ECHO_SES_TAG);
+
if (!strcmp(tgt->obd_type->typ_name, LUSTRE_MDT_NAME)) {
#ifdef HAVE_SERVER_SUPPORT
lu_context_tags_update(ECHO_MD_CTX_TAG);
- lu_session_tags_update(ECHO_MD_SES_TAG);
#else
CERROR(
"Local operations are NOT supported on client side. Only remote operations are supported. Metadata client must be run on server side.\n");
if (!ed)
RETURN(0);
+ lu_session_tags_clear(ECHO_SES_TAG & ~LCT_SESSION);
+ lu_context_tags_clear(ECHO_DT_CTX_TAG);
if (ed->ed_next_ismd) {
#ifdef HAVE_SERVER_SUPPORT
lu_context_tags_clear(ECHO_MD_CTX_TAG);
- lu_session_tags_clear(ECHO_MD_SES_TAG);
#else
CERROR(
"This is client-side only module, does not support metadata echo client.\n");