X-Git-Url: https://git.whamcloud.com/?p=fs%2Flustre-release.git;a=blobdiff_plain;f=lustre%2Fquota%2Fqsd_lib.c;h=277791b08e59cde130440c11420b4b867ab3abcf;hp=65208d2a282907f0462b240eaa0df876f2ab7e44;hb=3aa4a85d37563db1e87b7da9e621bc33ffad0108;hpb=c40c7939c0110ca53151c47249d85e56a6e0f24d diff --git a/lustre/quota/qsd_lib.c b/lustre/quota/qsd_lib.c index 65208d2..277791b 100644 --- a/lustre/quota/qsd_lib.c +++ b/lustre/quota/qsd_lib.c @@ -58,6 +58,19 @@ #include #include "qsd_internal.h" +cfs_mem_cache_t *upd_kmem; + +struct lu_kmem_descr qsd_caches[] = { + { + .ckd_cache = &upd_kmem, + .ckd_name = "upd_kmem", + .ckd_size = sizeof(struct qsd_upd_rec) + }, + { + .ckd_cache = NULL + } +}; + /* define qsd thread key */ LU_KEY_INIT_FINI(qsd, struct qsd_thread_info); LU_CONTEXT_KEY_DEFINE(qsd, LCT_MD_THREAD | LCT_DT_THREAD | LCT_LOCAL); @@ -68,16 +81,40 @@ static int lprocfs_qsd_rd_state(char *page, char **start, off_t off, int count, int *eof, void *data) { struct qsd_instance *qsd = (struct qsd_instance *)data; + char enabled[5]; + int rc; LASSERT(qsd != NULL); - return snprintf(page, count, - "target name: %s\n" - "pool ID: %d\n" - "type: %s\n" - "quota enabled: none\n", - qsd->qsd_svname, qsd->qsd_pool_id, - qsd->qsd_is_md ? "md" : "dt"); + memset(enabled, 0, sizeof(enabled)); + if (qsd_type_enabled(qsd, USRQUOTA)) + strcat(enabled, "u"); + if (qsd_type_enabled(qsd, GRPQUOTA)) + strcat(enabled, "g"); + if (strlen(enabled) == 0) + strcat(enabled, "none"); + + rc = snprintf(page, count, + "target name: %s\n" + "pool ID: %d\n" + "type: %s\n" + "quota enabled: %s\n" + "conn to master: %s\n", + qsd->qsd_svname, qsd->qsd_pool_id, + qsd->qsd_is_md ? "md" : "dt", enabled, + qsd->qsd_exp_valid ? "setup" : "not setup yet"); + + if (qsd->qsd_prepared) + rc += snprintf(page + rc, count - rc, + "user uptodate: glb[%d],slv[%d],reint[%d]\n" + "group uptodate: glb[%d],slv[%d],reint[%d]\n", + qsd->qsd_type_array[USRQUOTA]->qqi_glb_uptodate, + qsd->qsd_type_array[USRQUOTA]->qqi_slv_uptodate, + qsd->qsd_type_array[USRQUOTA]->qqi_reint, + qsd->qsd_type_array[GRPQUOTA]->qqi_glb_uptodate, + qsd->qsd_type_array[GRPQUOTA]->qqi_slv_uptodate, + qsd->qsd_type_array[GRPQUOTA]->qqi_reint); + return rc; } static int lprocfs_qsd_rd_enabled(char *page, char **start, off_t off, @@ -105,6 +142,47 @@ static struct lprocfs_vars lprocfs_quota_qsd_vars[] = { }; /* + * Callback function invoked by the OSP layer when the connection to the master + * has been set up. + * + * \param data - is a pointer to the qsd_instance + * + * \retval - 0 on success, appropriate error on failure + */ +static int qsd_conn_callback(void *data) +{ + struct qsd_instance *qsd = (struct qsd_instance *)data; + int type; + ENTRY; + + /* qsd_exp should now be valid */ + LASSERT(qsd->qsd_exp); + + /* grab reference on namespace */ + ldlm_namespace_get(class_exp2obd(qsd->qsd_exp)->obd_namespace); + qsd->qsd_ns = class_exp2obd(qsd->qsd_exp)->obd_namespace; + + cfs_write_lock(&qsd->qsd_lock); + /* notify that qsd_exp is now valid */ + qsd->qsd_exp_valid = true; + cfs_write_unlock(&qsd->qsd_lock); + + /* Now that the connection to master is setup, we can initiate the + * reintegration procedure for quota types which are enabled. + * It is worth noting that, if the qsd_instance hasn't been started + * already, then we can only complete the first two steps of the + * reintegration procedure (i.e. global lock enqueue and slave + * index transfer) since the space usage reconciliation (i.e. + * step 3) will have to wait for qsd_start() to be called */ + for (type = USRQUOTA; type < MAXQUOTAS; type++) { + struct qsd_qtype_info *qqi = qsd->qsd_type_array[type]; + cfs_waitq_signal(&qqi->qqi_reint_thread.t_ctl_waitq); + } + + RETURN(0); +} + +/* * Release qsd_qtype_info structure which contains data associated with a * given quota type. This releases the accounting objects. * It's called on OSD cleanup when the qsd instance is released. @@ -125,6 +203,16 @@ static void qsd_qtype_fini(const struct lu_env *env, struct qsd_instance *qsd, qqi = qsd->qsd_type_array[qtype]; qsd->qsd_type_array[qtype] = NULL; + /* all deferred work lists should be empty */ + LASSERT(cfs_list_empty(&qqi->qqi_deferred_glb)); + LASSERT(cfs_list_empty(&qqi->qqi_deferred_slv)); + + /* shutdown lquota site */ + if (qqi->qqi_site != NULL && !IS_ERR(qqi->qqi_site)) { + lquota_site_free(env, qqi->qqi_site); + qqi->qqi_site = NULL; + } + /* by now, all qqi users should have gone away */ LASSERT(cfs_atomic_read(&qqi->qqi_ref) == 1); lu_ref_fini(&qqi->qqi_reference); @@ -193,17 +281,22 @@ static int qsd_qtype_init(const struct lu_env *env, struct qsd_instance *qsd, qqi->qqi_glb_uptodate = false; qqi->qqi_slv_uptodate = false; qqi->qqi_reint = false; + cfs_waitq_init(&qqi->qqi_reint_thread.t_ctl_waitq); + thread_set_flags(&qqi->qqi_reint_thread, SVC_STOPPED); + CFS_INIT_LIST_HEAD(&qqi->qqi_deferred_glb); + CFS_INIT_LIST_HEAD(&qqi->qqi_deferred_slv); memset(&qqi->qqi_lockh, 0, sizeof(qqi->qqi_lockh)); - /* open accounting object */ - LASSERT(qqi->qqi_acct_obj == NULL); - qqi->qqi_acct_obj = acct_obj_lookup(env, qsd->qsd_dev, - qtype == USRQUOTA ? ACCT_USER_OID - : ACCT_GROUP_OID); - /* don't print any error message on failure in order not to confuse - * non-OFD user (e.g. 2.3 MDT stack) */ - if (IS_ERR(qqi->qqi_acct_obj)) - qqi->qqi_acct_obj = NULL; + /* open accounting object */ + LASSERT(qqi->qqi_acct_obj == NULL); + qqi->qqi_acct_obj = acct_obj_lookup(env, qsd->qsd_dev, qtype); + if (qqi->qqi_acct_obj == NULL) { + LCONSOLE_ERROR("%s: No %s space accounting support. Please use " + "tunefs.lustre --quota option to enable quota " + "accounting.\n", + qsd->qsd_svname, QTYPE_NAME(qtype)); + GOTO(out, rc = -ENOENT); + } /* open global index copy */ LASSERT(qqi->qqi_glb_obj == NULL); @@ -233,17 +326,24 @@ static int qsd_qtype_init(const struct lu_env *env, struct qsd_instance *qsd, } qqi->qqi_slv_ver = dt_version_get(env, qqi->qqi_slv_obj); + /* allocate site */ + qqi->qqi_site = lquota_site_alloc(env, qqi, false, qtype, &qsd_lqe_ops); + if (IS_ERR(qqi->qqi_site)) { + CERROR("%s: can't allocate site "DFID" %ld\n", qsd->qsd_svname, + PFID(&qqi->qqi_fid), PTR_ERR(qqi->qqi_site)); + GOTO(out, rc = PTR_ERR(qqi->qqi_site)); + } + /* register proc entry for accounting object */ rc = lprocfs_seq_create(qsd->qsd_proc, qtype == USRQUOTA ? "acct_user" : "acct_group", 0444, &lprocfs_quota_seq_fops, qqi->qqi_acct_obj); if (rc) { - CWARN("%s: can't add procfs entry for accounting file %d\n", - qsd->qsd_svname, rc); + CERROR("%s: can't add procfs entry for accounting file %d\n", + qsd->qsd_svname, rc); GOTO(out, rc); } - EXIT; out: if (rc) @@ -253,7 +353,8 @@ out: /* * Release a qsd_instance. Companion of qsd_init(). This releases all data - * structures associated with the quota slave. + * structures associated with the quota slave (on-disk objects, lquota entry + * tables, ...). * This function should be called when the OSD is shutting down. * * \param env - is the environment passed by the caller @@ -265,7 +366,9 @@ void qsd_fini(const struct lu_env *env, struct qsd_instance *qsd) ENTRY; CDEBUG(D_QUOTA, "%s: initiating QSD shutdown\n", qsd->qsd_svname); + cfs_write_lock(&qsd->qsd_lock); qsd->qsd_stopping = true; + cfs_write_unlock(&qsd->qsd_lock); /* remove from the list of fsinfo */ if (!cfs_list_empty(&qsd->qsd_link)) { @@ -281,10 +384,30 @@ void qsd_fini(const struct lu_env *env, struct qsd_instance *qsd) qsd->qsd_proc = NULL; } + /* stop the writeback thread */ + qsd_stop_upd_thread(qsd); + + /* shutdown the reintegration threads */ + for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) { + if (qsd->qsd_type_array[qtype] == NULL) + continue; + qsd_stop_reint_thread(qsd->qsd_type_array[qtype]); + } + + /* release reference on namespace */ + if (qsd->qsd_ns != NULL) { + ldlm_namespace_put(qsd->qsd_ns); + qsd->qsd_ns = NULL; + } + /* free per-quota type data */ for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) qsd_qtype_fini(env, qsd, qtype); + /* deregister connection to the quota master */ + qsd->qsd_exp_valid = false; + lustre_deregister_osp_item(&qsd->qsd_exp); + /* release per-filesystem information */ if (qsd->qsd_fsinfo != NULL) qsd_put_fsinfo(qsd->qsd_fsinfo); @@ -302,6 +425,7 @@ void qsd_fini(const struct lu_env *env, struct qsd_instance *qsd) qsd->qsd_dev = NULL; } + CDEBUG(D_QUOTA, "%s: QSD shutdown completed\n", qsd->qsd_svname); OBD_FREE_PTR(qsd); EXIT; } @@ -326,16 +450,30 @@ struct qsd_instance *qsd_init(const struct lu_env *env, char *svname, { struct qsd_thread_info *qti = qsd_info(env); struct qsd_instance *qsd; - int rc; + int rc, type, idx; ENTRY; + /* only configure qsd for MDT & OST */ + type = server_name2index(svname, &idx, NULL); + if (type != LDD_F_SV_TYPE_MDT && type != LDD_F_SV_TYPE_OST) + RETURN(NULL); + /* allocate qsd instance */ OBD_ALLOC_PTR(qsd); if (qsd == NULL) RETURN(ERR_PTR(-ENOMEM)); + /* generic initializations */ cfs_rwlock_init(&qsd->qsd_lock); CFS_INIT_LIST_HEAD(&qsd->qsd_link); + thread_set_flags(&qsd->qsd_upd_thread, SVC_STOPPED); + cfs_waitq_init(&qsd->qsd_upd_thread.t_ctl_waitq); + CFS_INIT_LIST_HEAD(&qsd->qsd_upd_list); + cfs_spin_lock_init(&qsd->qsd_adjust_lock); + CFS_INIT_LIST_HEAD(&qsd->qsd_adjust_list); + qsd->qsd_prepared = false; + qsd->qsd_started = false; + /* copy service name */ strncpy(qsd->qsd_svname, svname, MAX_OBD_NAME); @@ -378,6 +516,7 @@ struct qsd_instance *qsd_init(const struct lu_env *env, char *svname, svname, rc); GOTO(out, rc); } + EXIT; out: if (rc) { qsd_fini(env, qsd); @@ -404,15 +543,29 @@ EXPORT_SYMBOL(qsd_init); */ int qsd_prepare(const struct lu_env *env, struct qsd_instance *qsd) { - int rc, qtype; + struct qsd_thread_info *qti = qsd_info(env); + int qtype, rc = 0; ENTRY; LASSERT(qsd != NULL); + cfs_read_lock(&qsd->qsd_lock); + if (qsd->qsd_prepared) { + CERROR("%s: qsd instance already prepared\n", qsd->qsd_svname); + rc = -EALREADY; + } + cfs_read_unlock(&qsd->qsd_lock); + if (rc) + RETURN(rc); + /* Record whether this qsd instance is managing quota enforcement for a * MDT (i.e. inode quota) or OST (block quota) */ - if (lu_device_is_md(qsd->qsd_dev->dd_lu_dev.ld_site->ls_top_dev)) + if (lu_device_is_md(qsd->qsd_dev->dd_lu_dev.ld_site->ls_top_dev)) { qsd->qsd_is_md = true; + qsd->qsd_sync_threshold = LQUOTA_LEAST_QUNIT(LQUOTA_RES_MD); + } else { + qsd->qsd_sync_threshold = LQUOTA_LEAST_QUNIT(LQUOTA_RES_DT); + } /* look-up on-disk directory for the quota slave */ qsd->qsd_root = lquota_disk_dir_find_create(env, qsd->qsd_dev, NULL, @@ -432,10 +585,96 @@ int qsd_prepare(const struct lu_env *env, struct qsd_instance *qsd) RETURN(rc); } + /* pools successfully setup, mark the qsd as prepared */ + cfs_write_lock(&qsd->qsd_lock); + qsd->qsd_prepared = true; + cfs_write_unlock(&qsd->qsd_lock); + + /* start reintegration thread for each type, if required */ + for (qtype = USRQUOTA; qtype < MAXQUOTAS; qtype++) { + struct qsd_qtype_info *qqi = qsd->qsd_type_array[qtype]; + + rc = qsd_start_reint_thread(qqi); + if (rc) { + CERROR("%s: failed to start reint thread for type %s " + "(%d)\n", qsd->qsd_svname, QTYPE_NAME(qtype), + rc); + RETURN(rc); + } + } + + /* start writeback thread */ + rc = qsd_start_upd_thread(qsd); + if (rc) { + CERROR("%s: failed to start writeback thread (%d)\n", + qsd->qsd_svname, rc); + RETURN(rc); + } + + /* generate osp name */ + rc = tgt_name2ospname((char *)qsd->qsd_svname, qti->qti_buf); + if (rc) { + CERROR("%s: failed to generate ospname (%d)\n", + qsd->qsd_svname, rc); + RETURN(rc); + } + + /* the connection callback will start the reintegration + * procedure if quota is enabled */ + rc = lustre_register_osp_item(qti->qti_buf, &qsd->qsd_exp, + qsd_conn_callback, (void *)qsd); + if (rc) { + CERROR("%s: fail to get connection to master (%d)\n", + qsd->qsd_svname, rc); + RETURN(rc); + } + RETURN(0); } EXPORT_SYMBOL(qsd_prepare); +/* + * Start a qsd instance. This will complete the last step of the reintegration + * procedure as soon as possible (provided that the master is reachable). + * This should be called when recovery has been completed and quota should now + * be enforced on every operations. + * + * \param env - the environment passed by the caller + * \param qsd - is the qsd instance associated with the osd device to start + */ +int qsd_start(const struct lu_env *env, struct qsd_instance *qsd) +{ + int type, rc = 0; + ENTRY; + + cfs_write_lock(&qsd->qsd_lock); + if (!qsd->qsd_prepared) { + CERROR("%s: can't start qsd instance since it was properly " + "initialized\n", qsd->qsd_svname); + rc = -EFAULT; + } else if (qsd->qsd_started) { + CERROR("%s: qsd instance already started\n", qsd->qsd_svname); + rc = -EALREADY; + } else { + /* notify that the qsd_instance is now started */ + qsd->qsd_started = true; + } + cfs_write_unlock(&qsd->qsd_lock); + + if (rc) + RETURN(rc); + + /* Trigger the 3rd step of reintegration: If usage > granted, acquire + * up to usage; If usage < granted, release down to usage. */ + for (type = USRQUOTA; type < MAXQUOTAS; type++) { + struct qsd_qtype_info *qqi = qsd->qsd_type_array[type]; + cfs_waitq_signal(&qqi->qqi_reint_thread.t_ctl_waitq); + } + + RETURN(rc); +} +EXPORT_SYMBOL(qsd_start); + void lustre_register_quota_process_config(int (*qpc)(struct lustre_cfg *lcfg)); /* @@ -443,9 +682,16 @@ void lustre_register_quota_process_config(int (*qpc)(struct lustre_cfg *lcfg)); */ int qsd_glb_init(void) { + int rc; + + rc = lu_kmem_init(qsd_caches); + if (rc) + return rc; + qsd_key_init_generic(&qsd_thread_key, NULL); lu_context_key_register(&qsd_thread_key); lustre_register_quota_process_config(qsd_process_config); + return 0; } @@ -455,5 +701,6 @@ int qsd_glb_init(void) void qsd_glb_fini(void) { lustre_register_quota_process_config(NULL); + lu_kmem_fini(qsd_caches); lu_context_key_degister(&qsd_thread_key); }