4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2012, 2017, Intel Corporation.
27 * lustre/target/tgt_main.c
29 * Lustre Unified Target main initialization code
31 * Author: Mikhail Pershin <mike.pershin@intel.com>
34 #define DEBUG_SUBSYSTEM S_CLASS
37 #include "tgt_internal.h"
38 #include "../ptlrpc/ptlrpc_internal.h"
40 /* This must be longer than the longest string below */
41 #define SYNC_STATES_MAXLEN 16
42 static const char * const sync_lock_cancel_states[] = {
43 [SYNC_LOCK_CANCEL_NEVER] = "never",
44 [SYNC_LOCK_CANCEL_BLOCKING] = "blocking",
45 [SYNC_LOCK_CANCEL_ALWAYS] = "always",
49 * Show policy for handling dirty data under a lock being cancelled.
51 * \param[in] kobj sysfs kobject
52 * \param[in] attr sysfs attribute
53 * \param[in] buf buffer for data
55 * \retval 0 and buffer filled with data on success
56 * \retval negative value on error
58 ssize_t sync_lock_cancel_show(struct kobject *kobj,
59 struct attribute *attr, char *buf)
61 struct obd_device *obd = container_of(kobj, struct obd_device,
63 struct lu_target *tgt = obd->u.obt.obt_lut;
65 return sprintf(buf, "%s\n",
66 sync_lock_cancel_states[tgt->lut_sync_lock_cancel]);
68 EXPORT_SYMBOL(sync_lock_cancel_show);
71 * Change policy for handling dirty data under a lock being cancelled.
73 * This variable defines what action target takes upon lock cancel
74 * There are three possible modes:
75 * 1) never - never do sync upon lock cancel. This can lead to data
76 * inconsistencies if both the OST and client crash while writing a file
77 * that is also concurrently being read by another client. In these cases,
78 * this may allow the file data to "rewind" to an earlier state.
79 * 2) blocking - do sync only if there is blocking lock, e.g. if another
80 * client is trying to access this same object
81 * 3) always - do sync always
83 * \param[in] kobj kobject
84 * \param[in] attr attribute to show
85 * \param[in] buf buffer for data
86 * \param[in] count buffer size
88 * \retval \a count on success
89 * \retval negative value on error
91 ssize_t sync_lock_cancel_store(struct kobject *kobj, struct attribute *attr,
92 const char *buffer, size_t count)
94 struct obd_device *obd = container_of(kobj, struct obd_device,
96 struct lu_target *tgt = obd->u.obt.obt_lut;
98 enum tgt_sync_lock_cancel slc;
100 if (count == 0 || count >= SYNC_STATES_MAXLEN)
103 for (slc = 0; slc < ARRAY_SIZE(sync_lock_cancel_states); slc++) {
104 if (strcmp(buffer, sync_lock_cancel_states[slc]) == 0) {
110 /* Legacy numeric codes */
112 int rc = kstrtoint(buffer, 0, &val);
117 if (val < 0 || val > 2)
120 spin_lock(&tgt->lut_flags_lock);
121 tgt->lut_sync_lock_cancel = val;
122 spin_unlock(&tgt->lut_flags_lock);
125 EXPORT_SYMBOL(sync_lock_cancel_store);
126 LUSTRE_RW_ATTR(sync_lock_cancel);
129 * Show maximum number of Filter Modification Data (FMD) maintained.
131 * \param[in] kobj kobject
132 * \param[in] attr attribute to show
133 * \param[in] buf buffer for data
135 * \retval 0 and buffer filled with data on success
136 * \retval negative value on error
138 ssize_t tgt_fmd_count_show(struct kobject *kobj, struct attribute *attr,
141 struct obd_device *obd = container_of(kobj, struct obd_device,
143 struct lu_target *lut = obd->u.obt.obt_lut;
145 return sprintf(buf, "%u\n", lut->lut_fmd_max_num);
149 * Change number of FMDs maintained by target.
151 * This defines how large the list of FMDs can be.
153 * \param[in] kobj kobject
154 * \param[in] attr attribute to show
155 * \param[in] buf buffer for data
156 * \param[in] count buffer size
158 * \retval \a count on success
159 * \retval negative value on error
161 ssize_t tgt_fmd_count_store(struct kobject *kobj, struct attribute *attr,
162 const char *buffer, size_t count)
164 struct obd_device *obd = container_of(kobj, struct obd_device,
166 struct lu_target *lut = obd->u.obt.obt_lut;
169 rc = kstrtoint(buffer, 0, &val);
173 if (val < 1 || val > 65536)
176 lut->lut_fmd_max_num = val;
180 LUSTRE_RW_ATTR(tgt_fmd_count);
183 * Show the maximum age of FMD data in seconds.
185 * \param[in] kobj kobject
186 * \param[in] attr attribute to show
187 * \param[in] buf buffer for data
189 * \retval 0 and buffer filled with data on success
190 * \retval negative value on error
192 ssize_t tgt_fmd_seconds_show(struct kobject *kobj, struct attribute *attr,
195 struct obd_device *obd = container_of(kobj, struct obd_device,
197 struct lu_target *lut = obd->u.obt.obt_lut;
199 return sprintf(buf, "%lld\n", lut->lut_fmd_max_age);
203 * Set the maximum age of FMD data in seconds.
205 * This defines how long FMD data stays in the FMD list.
207 * \param[in] kobj kobject
208 * \param[in] attr attribute to show
209 * \param[in] buf buffer for data
210 * \param[in] count buffer size
212 * \retval \a count on success
213 * \retval negative number on error
215 ssize_t tgt_fmd_seconds_store(struct kobject *kobj, struct attribute *attr,
216 const char *buffer, size_t count)
218 struct obd_device *obd = container_of(kobj, struct obd_device,
220 struct lu_target *lut = obd->u.obt.obt_lut;
224 rc = kstrtoll(buffer, 0, &val);
228 if (val < 1 || val > 65536) /* ~ 18 hour max */
231 lut->lut_fmd_max_age = val;
235 LUSTRE_RW_ATTR(tgt_fmd_seconds);
237 /* These two aliases are old names and kept for compatibility, they were
238 * changed to 'tgt_fmd_count' and 'tgt_fmd_seconds'.
239 * This change was made in Lustre 2.13, so these aliases can be removed
240 * when back compatibility is not needed with any Lustre version prior 2.13
242 static struct lustre_attr tgt_fmd_count_compat = __ATTR(client_cache_count,
243 0644, tgt_fmd_count_show, tgt_fmd_count_store);
244 static struct lustre_attr tgt_fmd_seconds_compat = __ATTR(client_cache_seconds,
245 0644, tgt_fmd_seconds_show, tgt_fmd_seconds_store);
247 static const struct attribute *tgt_attrs[] = {
248 &lustre_attr_sync_lock_cancel.attr,
249 &lustre_attr_tgt_fmd_count.attr,
250 &lustre_attr_tgt_fmd_seconds.attr,
251 &tgt_fmd_count_compat.attr,
252 &tgt_fmd_seconds_compat.attr,
256 int tgt_tunables_init(struct lu_target *lut)
260 rc = sysfs_create_files(&lut->lut_obd->obd_kset.kobj, tgt_attrs);
262 lut->lut_attrs = tgt_attrs;
265 EXPORT_SYMBOL(tgt_tunables_init);
267 void tgt_tunables_fini(struct lu_target *lut)
269 if (lut->lut_attrs) {
270 sysfs_remove_files(&lut->lut_obd->obd_kset.kobj,
272 lut->lut_attrs = NULL;
275 EXPORT_SYMBOL(tgt_tunables_fini);
278 * Save cross-MDT lock in lut_slc_locks.
280 * Lock R/W count is not saved, but released in unlock (not canceled remotely),
281 * instead only a refcount is taken, so that the remote MDT where the object
282 * resides can detect conflict with this lock there.
285 * \param lock cross-MDT lock to save
286 * \param transno when the transaction with this transno is committed, this lock
289 void tgt_save_slc_lock(struct lu_target *lut, struct ldlm_lock *lock,
292 spin_lock(&lut->lut_slc_locks_guard);
293 lock_res_and_lock(lock);
294 if (ldlm_is_cbpending(lock)) {
295 /* if it was canceld by server, don't save, because remote MDT
296 * will do Sync-on-Cancel. */
299 lock->l_transno = transno;
300 /* if this lock is in the list already, there are two operations
301 * both use this lock, and save it after use, so for the second
302 * one, just put the refcount. */
303 if (list_empty(&lock->l_slc_link))
304 list_add_tail(&lock->l_slc_link, &lut->lut_slc_locks);
308 unlock_res_and_lock(lock);
309 spin_unlock(&lut->lut_slc_locks_guard);
311 EXPORT_SYMBOL(tgt_save_slc_lock);
314 * Discard cross-MDT lock from lut_slc_locks.
316 * This is called upon BAST, just remove lock from lut_slc_locks and put lock
317 * refcount. The BAST will cancel this lock.
320 * \param lock cross-MDT lock to discard
322 void tgt_discard_slc_lock(struct lu_target *lut, struct ldlm_lock *lock)
324 spin_lock(&lut->lut_slc_locks_guard);
325 lock_res_and_lock(lock);
326 /* may race with tgt_cancel_slc_locks() */
327 if (lock->l_transno != 0) {
328 LASSERT(!list_empty(&lock->l_slc_link));
329 LASSERT(ldlm_is_cbpending(lock));
330 list_del_init(&lock->l_slc_link);
334 unlock_res_and_lock(lock);
335 spin_unlock(&lut->lut_slc_locks_guard);
337 EXPORT_SYMBOL(tgt_discard_slc_lock);
340 * Cancel cross-MDT locks upon transaction commit.
342 * Remove cross-MDT locks from lut_slc_locks, cancel them and put lock refcount.
345 * \param transno transaction with this number was committed.
347 void tgt_cancel_slc_locks(struct lu_target *lut, __u64 transno)
349 struct ldlm_lock *lock, *next;
351 struct lustre_handle lockh;
353 spin_lock(&lut->lut_slc_locks_guard);
354 list_for_each_entry_safe(lock, next, &lut->lut_slc_locks,
356 lock_res_and_lock(lock);
357 LASSERT(lock->l_transno != 0);
358 if (lock->l_transno > transno) {
359 unlock_res_and_lock(lock);
362 /* ouch, another operation is using it after it's saved */
363 if (lock->l_readers != 0 || lock->l_writers != 0) {
364 unlock_res_and_lock(lock);
367 /* set CBPENDING so that this lock won't be used again */
368 ldlm_set_cbpending(lock);
370 list_move(&lock->l_slc_link, &list);
371 unlock_res_and_lock(lock);
373 spin_unlock(&lut->lut_slc_locks_guard);
375 list_for_each_entry_safe(lock, next, &list, l_slc_link) {
376 list_del_init(&lock->l_slc_link);
377 ldlm_lock2handle(lock, &lockh);
378 ldlm_cli_cancel(&lockh, LCF_ASYNC);
383 int tgt_init(const struct lu_env *env, struct lu_target *lut,
384 struct obd_device *obd, struct dt_device *dt,
385 struct tgt_opc_slice *slice, int request_fail_id,
388 struct dt_object_format dof;
392 struct tg_grants_data *tgd = &lut->lut_tgd;
393 struct obd_statfs *osfs;
401 lut->lut_bottom = dt;
402 lut->lut_last_rcvd = NULL;
403 lut->lut_client_bitmap = NULL;
404 atomic_set(&lut->lut_num_clients, 0);
405 atomic_set(&lut->lut_client_generation, 0);
406 lut->lut_reply_data = NULL;
407 lut->lut_reply_bitmap = NULL;
408 obd->u.obt.obt_lut = lut;
409 obd->u.obt.obt_magic = OBT_MAGIC;
411 /* set request handler slice and parameters */
412 lut->lut_slice = slice;
413 lut->lut_reply_fail_id = reply_fail_id;
414 lut->lut_request_fail_id = request_fail_id;
416 /* sptlrcp variables init */
417 rwlock_init(&lut->lut_sptlrpc_lock);
418 sptlrpc_rule_set_init(&lut->lut_sptlrpc_rset);
420 spin_lock_init(&lut->lut_flags_lock);
421 lut->lut_sync_lock_cancel = SYNC_LOCK_CANCEL_NEVER;
423 spin_lock_init(&lut->lut_slc_locks_guard);
424 INIT_LIST_HEAD(&lut->lut_slc_locks);
426 /* last_rcvd initialization is needed by replayable targets only */
427 if (!obd->obd_replayable)
430 /* initialize grant and statfs data in target */
431 dt_conf_get(env, lut->lut_bottom, &lut->lut_dt_conf);
434 spin_lock_init(&tgd->tgd_osfs_lock);
435 tgd->tgd_osfs_age = ktime_get_seconds() - 1000;
436 tgd->tgd_osfs_unstable = 0;
437 tgd->tgd_statfs_inflight = 0;
438 tgd->tgd_osfs_inflight = 0;
441 spin_lock_init(&tgd->tgd_grant_lock);
442 tgd->tgd_tot_dirty = 0;
443 tgd->tgd_tot_granted = 0;
444 tgd->tgd_tot_pending = 0;
445 tgd->tgd_grant_compat_disable = 0;
447 /* populate cached statfs data */
448 osfs = &tgt_th_info(env)->tti_u.osfs;
449 rc = tgt_statfs_internal(env, lut, osfs, 0, NULL);
451 CERROR("%s: can't get statfs data, rc %d\n", tgt_name(lut),
455 if (!is_power_of_2(osfs->os_bsize)) {
456 CERROR("%s: blocksize (%d) is not a power of 2\n",
457 tgt_name(lut), osfs->os_bsize);
458 GOTO(out, rc = -EPROTO);
460 tgd->tgd_blockbits = fls(osfs->os_bsize) - 1;
462 spin_lock_init(&lut->lut_translock);
463 spin_lock_init(&lut->lut_client_bitmap_lock);
465 OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
466 if (lut->lut_client_bitmap == NULL)
469 memset(&attr, 0, sizeof(attr));
470 attr.la_valid = LA_MODE;
471 attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
472 dof.dof_type = dt_mode_to_dft(S_IFREG);
474 lu_local_obj_fid(&fid, LAST_RECV_OID);
476 o = dt_find_or_create(env, lut->lut_bottom, &fid, &dof, &attr);
479 CERROR("%s: cannot open LAST_RCVD: rc = %d\n", tgt_name(lut),
484 lut->lut_last_rcvd = o;
485 rc = tgt_server_data_init(env, lut);
489 /* prepare transactions callbacks */
490 lut->lut_txn_cb.dtc_txn_start = tgt_txn_start_cb;
491 lut->lut_txn_cb.dtc_txn_stop = tgt_txn_stop_cb;
492 lut->lut_txn_cb.dtc_cookie = lut;
493 lut->lut_txn_cb.dtc_tag = LCT_DT_THREAD | LCT_MD_THREAD;
494 INIT_LIST_HEAD(&lut->lut_txn_cb.dtc_linkage);
496 dt_txn_callback_add(lut->lut_bottom, &lut->lut_txn_cb);
497 lut->lut_bottom->dd_lu_dev.ld_site->ls_tgt = lut;
499 lut->lut_fmd_max_num = LUT_FMD_MAX_NUM_DEFAULT;
500 lut->lut_fmd_max_age = LUT_FMD_MAX_AGE_DEFAULT;
502 atomic_set(&lut->lut_sync_count, 0);
504 /* reply_data is supported by MDT targets only for now */
505 if (strncmp(obd->obd_type->typ_name, LUSTRE_MDT_NAME, 3) != 0)
508 OBD_ALLOC(lut->lut_reply_bitmap,
509 LUT_REPLY_SLOTS_MAX_CHUNKS * sizeof(unsigned long *));
510 if (lut->lut_reply_bitmap == NULL)
511 GOTO(out, rc = -ENOMEM);
513 memset(&attr, 0, sizeof(attr));
514 attr.la_valid = LA_MODE;
515 attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
516 dof.dof_type = dt_mode_to_dft(S_IFREG);
518 lu_local_obj_fid(&fid, REPLY_DATA_OID);
520 o = dt_find_or_create(env, lut->lut_bottom, &fid, &dof, &attr);
523 CERROR("%s: cannot open REPLY_DATA: rc = %d\n", tgt_name(lut),
527 lut->lut_reply_data = o;
529 rc = tgt_reply_data_init(env, lut);
536 dt_txn_callback_del(lut->lut_bottom, &lut->lut_txn_cb);
538 obd->u.obt.obt_magic = 0;
539 obd->u.obt.obt_lut = NULL;
540 if (lut->lut_last_rcvd != NULL) {
541 dt_object_put(env, lut->lut_last_rcvd);
542 lut->lut_last_rcvd = NULL;
544 if (lut->lut_client_bitmap != NULL)
545 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
546 lut->lut_client_bitmap = NULL;
547 if (lut->lut_reply_data != NULL)
548 dt_object_put(env, lut->lut_reply_data);
549 lut->lut_reply_data = NULL;
550 if (lut->lut_reply_bitmap != NULL) {
551 for (i = 0; i < LUT_REPLY_SLOTS_MAX_CHUNKS; i++) {
552 if (lut->lut_reply_bitmap[i] != NULL)
553 OBD_FREE_LARGE(lut->lut_reply_bitmap[i],
554 BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
556 lut->lut_reply_bitmap[i] = NULL;
558 OBD_FREE(lut->lut_reply_bitmap,
559 LUT_REPLY_SLOTS_MAX_CHUNKS * sizeof(unsigned long *));
561 lut->lut_reply_bitmap = NULL;
564 EXPORT_SYMBOL(tgt_init);
566 void tgt_fini(const struct lu_env *env, struct lu_target *lut)
572 if (lut->lut_lsd.lsd_feature_incompat & OBD_INCOMPAT_MULTI_RPCS &&
573 atomic_read(&lut->lut_num_clients) == 0) {
574 /* Clear MULTI RPCS incompatibility flag that prevents previous
575 * Lustre versions to mount a target with reply_data file */
576 lut->lut_lsd.lsd_feature_incompat &= ~OBD_INCOMPAT_MULTI_RPCS;
577 rc = tgt_server_data_update(env, lut, 1);
579 CERROR("%s: unable to clear MULTI RPCS "
580 "incompatibility flag\n",
581 lut->lut_obd->obd_name);
584 sptlrpc_rule_set_free(&lut->lut_sptlrpc_rset);
586 if (lut->lut_reply_data != NULL)
587 dt_object_put(env, lut->lut_reply_data);
588 lut->lut_reply_data = NULL;
589 if (lut->lut_reply_bitmap != NULL) {
590 for (i = 0; i < LUT_REPLY_SLOTS_MAX_CHUNKS; i++) {
591 if (lut->lut_reply_bitmap[i] != NULL)
592 OBD_FREE_LARGE(lut->lut_reply_bitmap[i],
593 BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
595 lut->lut_reply_bitmap[i] = NULL;
597 OBD_FREE(lut->lut_reply_bitmap,
598 LUT_REPLY_SLOTS_MAX_CHUNKS * sizeof(unsigned long *));
600 lut->lut_reply_bitmap = NULL;
601 if (lut->lut_client_bitmap) {
602 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
603 lut->lut_client_bitmap = NULL;
605 if (lut->lut_last_rcvd) {
606 dt_txn_callback_del(lut->lut_bottom, &lut->lut_txn_cb);
607 dt_object_put(env, lut->lut_last_rcvd);
608 lut->lut_last_rcvd = NULL;
612 EXPORT_SYMBOL(tgt_fini);
614 static struct kmem_cache *tgt_thread_kmem;
615 static struct kmem_cache *tgt_session_kmem;
616 struct kmem_cache *tgt_fmd_kmem;
618 static struct lu_kmem_descr tgt_caches[] = {
620 .ckd_cache = &tgt_thread_kmem,
621 .ckd_name = "tgt_thread_kmem",
622 .ckd_size = sizeof(struct tgt_thread_info),
625 .ckd_cache = &tgt_session_kmem,
626 .ckd_name = "tgt_session_kmem",
627 .ckd_size = sizeof(struct tgt_session_info)
630 .ckd_cache = &tgt_fmd_kmem,
631 .ckd_name = "tgt_fmd_cache",
632 .ckd_size = sizeof(struct tgt_fmd_data)
640 /* context key constructor/destructor: tg_key_init, tg_key_fini */
641 static void *tgt_key_init(const struct lu_context *ctx,
642 struct lu_context_key *key)
644 struct tgt_thread_info *thread;
646 OBD_SLAB_ALLOC_PTR_GFP(thread, tgt_thread_kmem, GFP_NOFS);
648 return ERR_PTR(-ENOMEM);
653 static void tgt_key_fini(const struct lu_context *ctx,
654 struct lu_context_key *key, void *data)
656 struct tgt_thread_info *info = data;
657 struct thandle_exec_args *args = &info->tti_tea;
660 for (i = 0; i < args->ta_alloc_args; i++) {
661 if (args->ta_args[i] != NULL)
662 OBD_FREE_PTR(args->ta_args[i]);
665 if (args->ta_args != NULL)
666 OBD_FREE_PTR_ARRAY(args->ta_args, args->ta_alloc_args);
667 OBD_SLAB_FREE_PTR(info, tgt_thread_kmem);
670 static void tgt_key_exit(const struct lu_context *ctx,
671 struct lu_context_key *key, void *data)
673 struct tgt_thread_info *tti = data;
675 tti->tti_has_trans = 0;
676 tti->tti_mult_trans = 0;
679 /* context key: tg_thread_key */
680 struct lu_context_key tgt_thread_key = {
681 .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD,
682 .lct_init = tgt_key_init,
683 .lct_fini = tgt_key_fini,
684 .lct_exit = tgt_key_exit,
687 LU_KEY_INIT_GENERIC(tgt);
689 static void *tgt_ses_key_init(const struct lu_context *ctx,
690 struct lu_context_key *key)
692 struct tgt_session_info *session;
694 OBD_SLAB_ALLOC_PTR_GFP(session, tgt_session_kmem, GFP_NOFS);
696 return ERR_PTR(-ENOMEM);
701 static void tgt_ses_key_fini(const struct lu_context *ctx,
702 struct lu_context_key *key, void *data)
704 struct tgt_session_info *session = data;
706 OBD_SLAB_FREE_PTR(session, tgt_session_kmem);
709 /* context key: tgt_session_key */
710 struct lu_context_key tgt_session_key = {
711 .lct_tags = LCT_SERVER_SESSION,
712 .lct_init = tgt_ses_key_init,
713 .lct_fini = tgt_ses_key_fini,
715 EXPORT_SYMBOL(tgt_session_key);
717 LU_KEY_INIT_GENERIC(tgt_ses);
720 * this page is allocated statically when module is initializing
721 * it is used to simulate data corruptions, see ost_checksum_bulk()
722 * for details. as the original pages provided by the layers below
723 * can be remain in the internal cache, we do not want to modify
726 struct page *tgt_page_to_corrupt;
728 int tgt_mod_init(void)
733 result = lu_kmem_init(tgt_caches);
737 tgt_page_to_corrupt = alloc_page(GFP_KERNEL);
739 tgt_key_init_generic(&tgt_thread_key, NULL);
740 lu_context_key_register_many(&tgt_thread_key, NULL);
742 tgt_ses_key_init_generic(&tgt_session_key, NULL);
743 lu_context_key_register_many(&tgt_session_key, NULL);
751 void tgt_mod_exit(void)
754 if (tgt_page_to_corrupt != NULL)
755 put_page(tgt_page_to_corrupt);
757 lu_context_key_degister(&tgt_thread_key);
758 lu_context_key_degister(&tgt_session_key);
761 lu_kmem_fini(tgt_caches);