4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; if not, write to the
18 * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19 * Boston, MA 021110-1307, USA
24 * Copyright (c) 2012, 2017, Intel Corporation.
27 * lustre/target/tgt_main.c
29 * Lustre Unified Target main initialization code
31 * Author: Mikhail Pershin <mike.pershin@intel.com>
34 #define DEBUG_SUBSYSTEM S_CLASS
37 #include <obd_target.h>
38 #include <obd_cksum.h>
39 #include "tgt_internal.h"
40 #include "../ptlrpc/ptlrpc_internal.h"
42 /* This must be longer than the longest string below */
43 #define SYNC_STATES_MAXLEN 16
44 static const char * const sync_lock_cancel_states[] = {
45 [SYNC_LOCK_CANCEL_NEVER] = "never",
46 [SYNC_LOCK_CANCEL_BLOCKING] = "blocking",
47 [SYNC_LOCK_CANCEL_ALWAYS] = "always",
51 * Show policy for handling dirty data under a lock being cancelled.
53 * \param[in] kobj sysfs kobject
54 * \param[in] attr sysfs attribute
55 * \param[in] buf buffer for data
57 * \retval 0 and buffer filled with data on success
58 * \retval negative value on error
60 ssize_t sync_lock_cancel_show(struct kobject *kobj,
61 struct attribute *attr, char *buf)
63 struct obd_device *obd = container_of(kobj, struct obd_device,
65 struct lu_target *tgt = obd2obt(obd)->obt_lut;
67 return sprintf(buf, "%s\n",
68 sync_lock_cancel_states[tgt->lut_sync_lock_cancel]);
70 EXPORT_SYMBOL(sync_lock_cancel_show);
73 * Change policy for handling dirty data under a lock being cancelled.
75 * This variable defines what action target takes upon lock cancel
76 * There are three possible modes:
77 * 1) never - never do sync upon lock cancel. This can lead to data
78 * inconsistencies if both the OST and client crash while writing a file
79 * that is also concurrently being read by another client. In these cases,
80 * this may allow the file data to "rewind" to an earlier state.
81 * 2) blocking - do sync only if there is blocking lock, e.g. if another
82 * client is trying to access this same object
83 * 3) always - do sync always
85 * \param[in] kobj kobject
86 * \param[in] attr attribute to show
87 * \param[in] buf buffer for data
88 * \param[in] count buffer size
90 * \retval \a count on success
91 * \retval negative value on error
93 ssize_t sync_lock_cancel_store(struct kobject *kobj, struct attribute *attr,
94 const char *buffer, size_t count)
96 struct obd_device *obd = container_of(kobj, struct obd_device,
98 struct lu_target *tgt = obd2obt(obd)->obt_lut;
100 enum tgt_sync_lock_cancel slc;
102 if (count == 0 || count >= SYNC_STATES_MAXLEN)
105 for (slc = 0; slc < ARRAY_SIZE(sync_lock_cancel_states); slc++) {
106 if (strcmp(buffer, sync_lock_cancel_states[slc]) == 0) {
112 /* Legacy numeric codes */
114 int rc = kstrtoint(buffer, 0, &val);
119 if (val < 0 || val > 2)
122 spin_lock(&tgt->lut_flags_lock);
123 tgt->lut_sync_lock_cancel = val;
124 spin_unlock(&tgt->lut_flags_lock);
127 EXPORT_SYMBOL(sync_lock_cancel_store);
128 LUSTRE_RW_ATTR(sync_lock_cancel);
131 * Show maximum number of Filter Modification Data (FMD) maintained.
133 * \param[in] kobj kobject
134 * \param[in] attr attribute to show
135 * \param[in] buf buffer for data
137 * \retval 0 and buffer filled with data on success
138 * \retval negative value on error
140 ssize_t tgt_fmd_count_show(struct kobject *kobj, struct attribute *attr,
143 struct obd_device *obd = container_of(kobj, struct obd_device,
145 struct lu_target *lut = obd2obt(obd)->obt_lut;
147 return sprintf(buf, "%u\n", lut->lut_fmd_max_num);
151 * Change number of FMDs maintained by target.
153 * This defines how large the list of FMDs can be.
155 * \param[in] kobj kobject
156 * \param[in] attr attribute to show
157 * \param[in] buf buffer for data
158 * \param[in] count buffer size
160 * \retval \a count on success
161 * \retval negative value on error
163 ssize_t tgt_fmd_count_store(struct kobject *kobj, struct attribute *attr,
164 const char *buffer, size_t count)
166 struct obd_device *obd = container_of(kobj, struct obd_device,
168 struct lu_target *lut = obd2obt(obd)->obt_lut;
171 rc = kstrtoint(buffer, 0, &val);
175 if (val < 1 || val > 65536)
178 lut->lut_fmd_max_num = val;
182 LUSTRE_RW_ATTR(tgt_fmd_count);
185 * Show the maximum age of FMD data in seconds.
187 * \param[in] kobj kobject
188 * \param[in] attr attribute to show
189 * \param[in] buf buffer for data
191 * \retval 0 and buffer filled with data on success
192 * \retval negative value on error
194 ssize_t tgt_fmd_seconds_show(struct kobject *kobj, struct attribute *attr,
197 struct obd_device *obd = container_of(kobj, struct obd_device,
199 struct lu_target *lut = obd2obt(obd)->obt_lut;
201 return sprintf(buf, "%lld\n", lut->lut_fmd_max_age);
205 * Set the maximum age of FMD data in seconds.
207 * This defines how long FMD data stays in the FMD list.
209 * \param[in] kobj kobject
210 * \param[in] attr attribute to show
211 * \param[in] buf buffer for data
212 * \param[in] count buffer size
214 * \retval \a count on success
215 * \retval negative number on error
217 ssize_t tgt_fmd_seconds_store(struct kobject *kobj, struct attribute *attr,
218 const char *buffer, size_t count)
220 struct obd_device *obd = container_of(kobj, struct obd_device,
222 struct lu_target *lut = obd2obt(obd)->obt_lut;
226 rc = kstrtoll(buffer, 0, &val);
230 if (val < 1 || val > 65536) /* ~ 18 hour max */
233 lut->lut_fmd_max_age = val;
237 LUSTRE_RW_ATTR(tgt_fmd_seconds);
239 /* These two aliases are old names and kept for compatibility, they were
240 * changed to 'tgt_fmd_count' and 'tgt_fmd_seconds'.
241 * This change was made in Lustre 2.13, so these aliases can be removed
242 * when back compatibility is not needed with any Lustre version prior 2.13
244 static struct lustre_attr tgt_fmd_count_compat = __ATTR(client_cache_count,
245 0644, tgt_fmd_count_show, tgt_fmd_count_store);
246 static struct lustre_attr tgt_fmd_seconds_compat = __ATTR(client_cache_seconds,
247 0644, tgt_fmd_seconds_show, tgt_fmd_seconds_store);
249 static const struct attribute *tgt_attrs[] = {
250 &lustre_attr_sync_lock_cancel.attr,
251 &lustre_attr_tgt_fmd_count.attr,
252 &lustre_attr_tgt_fmd_seconds.attr,
253 &tgt_fmd_count_compat.attr,
254 &tgt_fmd_seconds_compat.attr,
259 * Decide which checksums both client and OST support, possibly forcing
260 * the use of T10PI checksums if the hardware supports this.
262 * The clients that have no T10-PI RPC checksum support will use the same
263 * mechanism to select checksum type as before, and will not be affected by
264 * the following logic.
266 * For the clients that have T10-PI RPC checksum support:
268 * If the target supports T10-PI feature and T10-PI checksum is enforced,
269 * clients will have no other choice for RPC checksum type other than using
270 * the T10PI checksum type. This is useful for enforcing end-to-end integrity
271 * in the whole system.
273 * If the target doesn't support T10-PI feature and T10-PI checksum is
274 * enforced, together with other checksum with reasonably good speeds (e.g.
275 * crc32, crc32c, adler, etc.), all T10-PI checksum types understood by the
276 * client (t10ip512, t10ip4K, t10crc512, t10crc4K) will be added to the
277 * available checksum types, regardless of the speeds of T10-PI checksums.
278 * This is useful for testing T10-PI checksum of RPC.
280 * If the target supports T10-PI feature and T10-PI checksum is NOT enforced,
281 * the corresponding T10-PI checksum type will be added to the checksum type
282 * list, regardless of the speed of the T10-PI checksum. This provides clients
283 * the flexibility to choose whether to enable end-to-end integrity or not.
285 * If the target does NOT supports T10-PI feature and T10-PI checksum is NOT
286 * enforced, together with other checksums with reasonably good speeds,
287 * all the T10-PI checksum types with good speeds will be added into the
288 * checksum type list. Note that a T10-PI checksum type with a speed worse
289 * than half of Alder will NOT be added as a option. In this circumstance,
290 * T10-PI checksum types has the same behavior like other normal checksum
293 void tgt_mask_cksum_types(struct lu_target *lut, enum cksum_types *cksum_types)
295 bool enforce = lut->lut_cksum_t10pi_enforce;
296 enum cksum_types tgt_t10_cksum_type;
297 enum cksum_types client_t10_types = *cksum_types & OBD_CKSUM_T10_ALL;
298 enum cksum_types server_t10_types;
301 * The client set in ocd_cksum_types the checksum types it
302 * supports. We have to mask off the algorithms that we don't
303 * support. T10PI checksum types will be added later.
305 *cksum_types &= (lut->lut_cksum_types_supported & ~OBD_CKSUM_T10_ALL);
306 server_t10_types = lut->lut_cksum_types_supported & OBD_CKSUM_T10_ALL;
307 tgt_t10_cksum_type = lut->lut_dt_conf.ddp_t10_cksum_type;
309 /* Quick exit if no T10-PI support on client */
310 if (!client_t10_types)
314 * This OST has NO T10-PI feature. Add all supported T10-PI checksums
315 * as options if T10-PI checksum is enforced. If the T10-PI checksum is
316 * not enforced, only add them as options when speed is good.
318 if (tgt_t10_cksum_type == 0) {
320 * Server allows all T10PI checksums, and server_t10_types
321 * include quick ones.
324 *cksum_types |= client_t10_types;
326 *cksum_types |= client_t10_types & server_t10_types;
331 * This OST has T10-PI feature. Disable all other checksum types if
332 * T10-PI checksum is enforced. If the T10-PI checksum is not enforced,
333 * add the checksum type as an option.
335 if (client_t10_types & tgt_t10_cksum_type) {
337 *cksum_types = tgt_t10_cksum_type;
339 *cksum_types |= tgt_t10_cksum_type;
342 EXPORT_SYMBOL(tgt_mask_cksum_types);
344 int tgt_tunables_init(struct lu_target *lut)
348 rc = sysfs_create_files(&lut->lut_obd->obd_kset.kobj, tgt_attrs);
350 lut->lut_attrs = tgt_attrs;
353 EXPORT_SYMBOL(tgt_tunables_init);
355 void tgt_tunables_fini(struct lu_target *lut)
357 if (lut->lut_attrs) {
358 sysfs_remove_files(&lut->lut_obd->obd_kset.kobj,
360 lut->lut_attrs = NULL;
363 EXPORT_SYMBOL(tgt_tunables_fini);
366 * Save cross-MDT lock in lut_slc_locks.
368 * Lock R/W count is not saved, but released in unlock (not canceled remotely),
369 * instead only a refcount is taken, so that the remote MDT where the object
370 * resides can detect conflict with this lock there.
373 * \param lock cross-MDT lock to save
374 * \param transno when the transaction with this transno is committed, this lock
377 void tgt_save_slc_lock(struct lu_target *lut, struct ldlm_lock *lock,
380 spin_lock(&lut->lut_slc_locks_guard);
381 lock_res_and_lock(lock);
382 if (ldlm_is_cbpending(lock)) {
383 /* if it was canceld by server, don't save, because remote MDT
384 * will do Sync-on-Cancel. */
387 lock->l_transno = transno;
388 /* if this lock is in the list already, there are two operations
389 * both use this lock, and save it after use, so for the second
390 * one, just put the refcount. */
391 if (list_empty(&lock->l_slc_link))
392 list_add_tail(&lock->l_slc_link, &lut->lut_slc_locks);
396 unlock_res_and_lock(lock);
397 spin_unlock(&lut->lut_slc_locks_guard);
399 EXPORT_SYMBOL(tgt_save_slc_lock);
402 * Discard cross-MDT lock from lut_slc_locks.
404 * This is called upon BAST, just remove lock from lut_slc_locks and put lock
405 * refcount. The BAST will cancel this lock.
408 * \param lock cross-MDT lock to discard
410 void tgt_discard_slc_lock(struct lu_target *lut, struct ldlm_lock *lock)
412 spin_lock(&lut->lut_slc_locks_guard);
413 lock_res_and_lock(lock);
414 /* may race with tgt_cancel_slc_locks() */
415 if (lock->l_transno != 0) {
416 LASSERT(!list_empty(&lock->l_slc_link));
417 LASSERT(ldlm_is_cbpending(lock));
418 list_del_init(&lock->l_slc_link);
422 unlock_res_and_lock(lock);
423 spin_unlock(&lut->lut_slc_locks_guard);
425 EXPORT_SYMBOL(tgt_discard_slc_lock);
428 * Cancel cross-MDT locks upon transaction commit.
430 * Remove cross-MDT locks from lut_slc_locks, cancel them and put lock refcount.
433 * \param transno transaction with this number was committed.
435 void tgt_cancel_slc_locks(struct lu_target *lut, __u64 transno)
437 struct ldlm_lock *lock, *next;
439 struct lustre_handle lockh;
441 spin_lock(&lut->lut_slc_locks_guard);
442 list_for_each_entry_safe(lock, next, &lut->lut_slc_locks,
444 lock_res_and_lock(lock);
445 LASSERT(lock->l_transno != 0);
446 if (lock->l_transno > transno) {
447 unlock_res_and_lock(lock);
450 /* ouch, another operation is using it after it's saved */
451 if (lock->l_readers != 0 || lock->l_writers != 0) {
452 unlock_res_and_lock(lock);
455 /* set CBPENDING so that this lock won't be used again */
456 ldlm_set_cbpending(lock);
458 list_move(&lock->l_slc_link, &list);
459 unlock_res_and_lock(lock);
461 spin_unlock(&lut->lut_slc_locks_guard);
463 list_for_each_entry_safe(lock, next, &list, l_slc_link) {
464 list_del_init(&lock->l_slc_link);
465 ldlm_lock2handle(lock, &lockh);
466 ldlm_cli_cancel(&lockh, LCF_ASYNC);
471 int tgt_init(const struct lu_env *env, struct lu_target *lut,
472 struct obd_device *obd, struct dt_device *dt,
473 struct tgt_opc_slice *slice, int request_fail_id,
476 struct dt_object_format dof;
480 struct tg_grants_data *tgd = &lut->lut_tgd;
481 struct obd_statfs *osfs;
482 struct obd_device_target *obt;
490 lut->lut_bottom = dt;
491 lut->lut_last_rcvd = NULL;
492 lut->lut_client_bitmap = NULL;
493 atomic_set(&lut->lut_num_clients, 0);
494 atomic_set(&lut->lut_client_generation, 0);
495 lut->lut_reply_data = NULL;
496 lut->lut_reply_bitmap = NULL;
497 obt = obd_obt_init(obd);
500 /* set request handler slice and parameters */
501 lut->lut_slice = slice;
502 lut->lut_reply_fail_id = reply_fail_id;
503 lut->lut_request_fail_id = request_fail_id;
505 /* sptlrcp variables init */
506 rwlock_init(&lut->lut_sptlrpc_lock);
507 sptlrpc_rule_set_init(&lut->lut_sptlrpc_rset);
509 spin_lock_init(&lut->lut_flags_lock);
510 lut->lut_sync_lock_cancel = SYNC_LOCK_CANCEL_NEVER;
511 lut->lut_cksum_t10pi_enforce = 0;
512 lut->lut_cksum_types_supported =
513 obd_cksum_types_supported_server(obd->obd_name);
515 spin_lock_init(&lut->lut_slc_locks_guard);
516 INIT_LIST_HEAD(&lut->lut_slc_locks);
518 /* last_rcvd initialization is needed by replayable targets only */
519 if (!obd->obd_replayable)
522 /* initialize grant and statfs data in target */
523 dt_conf_get(env, lut->lut_bottom, &lut->lut_dt_conf);
526 spin_lock_init(&tgd->tgd_osfs_lock);
527 tgd->tgd_osfs_age = ktime_get_seconds() - 1000;
528 tgd->tgd_osfs_unstable = 0;
529 tgd->tgd_statfs_inflight = 0;
530 tgd->tgd_osfs_inflight = 0;
533 spin_lock_init(&tgd->tgd_grant_lock);
534 tgd->tgd_tot_dirty = 0;
535 tgd->tgd_tot_granted = 0;
536 tgd->tgd_tot_pending = 0;
537 tgd->tgd_grant_compat_disable = 0;
539 /* populate cached statfs data */
540 osfs = &tgt_th_info(env)->tti_u.osfs;
541 rc = tgt_statfs_internal(env, lut, osfs, 0, NULL);
543 CERROR("%s: can't get statfs data, rc %d\n", tgt_name(lut),
547 if (!is_power_of_2(osfs->os_bsize)) {
548 CERROR("%s: blocksize (%d) is not a power of 2\n",
549 tgt_name(lut), osfs->os_bsize);
550 GOTO(out, rc = -EPROTO);
552 tgd->tgd_blockbits = fls(osfs->os_bsize) - 1;
554 spin_lock_init(&lut->lut_translock);
555 spin_lock_init(&lut->lut_client_bitmap_lock);
557 OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
558 if (lut->lut_client_bitmap == NULL)
561 memset(&attr, 0, sizeof(attr));
562 attr.la_valid = LA_MODE;
563 attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
564 dof.dof_type = dt_mode_to_dft(S_IFREG);
566 lu_local_obj_fid(&fid, LAST_RECV_OID);
568 o = dt_find_or_create(env, lut->lut_bottom, &fid, &dof, &attr);
571 CERROR("%s: cannot open LAST_RCVD: rc = %d\n", tgt_name(lut),
576 lut->lut_last_rcvd = o;
577 rc = tgt_server_data_init(env, lut);
581 /* prepare transactions callbacks */
582 lut->lut_txn_cb.dtc_txn_start = tgt_txn_start_cb;
583 lut->lut_txn_cb.dtc_txn_stop = tgt_txn_stop_cb;
584 lut->lut_txn_cb.dtc_cookie = lut;
585 lut->lut_txn_cb.dtc_tag = LCT_DT_THREAD | LCT_MD_THREAD;
586 INIT_LIST_HEAD(&lut->lut_txn_cb.dtc_linkage);
588 dt_txn_callback_add(lut->lut_bottom, &lut->lut_txn_cb);
589 lut->lut_bottom->dd_lu_dev.ld_site->ls_tgt = lut;
591 lut->lut_fmd_max_num = LUT_FMD_MAX_NUM_DEFAULT;
592 lut->lut_fmd_max_age = LUT_FMD_MAX_AGE_DEFAULT;
594 atomic_set(&lut->lut_sync_count, 0);
596 /* reply_data is supported by MDT targets only for now */
597 if (strncmp(obd->obd_type->typ_name, LUSTRE_MDT_NAME, 3) != 0)
600 OBD_ALLOC(lut->lut_reply_bitmap,
601 LUT_REPLY_SLOTS_MAX_CHUNKS * sizeof(unsigned long *));
602 if (lut->lut_reply_bitmap == NULL)
603 GOTO(out, rc = -ENOMEM);
605 memset(&attr, 0, sizeof(attr));
606 attr.la_valid = LA_MODE;
607 attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
608 dof.dof_type = dt_mode_to_dft(S_IFREG);
610 lu_local_obj_fid(&fid, REPLY_DATA_OID);
612 o = dt_find_or_create(env, lut->lut_bottom, &fid, &dof, &attr);
615 CERROR("%s: cannot open REPLY_DATA: rc = %d\n", tgt_name(lut),
619 lut->lut_reply_data = o;
621 rc = tgt_reply_data_init(env, lut);
628 dt_txn_callback_del(lut->lut_bottom, &lut->lut_txn_cb);
630 obd2obt(obd)->obt_lut = NULL;
631 obd2obt(obd)->obt_magic = 0;
632 if (lut->lut_last_rcvd != NULL) {
633 dt_object_put(env, lut->lut_last_rcvd);
634 lut->lut_last_rcvd = NULL;
636 if (lut->lut_client_bitmap != NULL)
637 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
638 lut->lut_client_bitmap = NULL;
639 if (lut->lut_reply_data != NULL)
640 dt_object_put(env, lut->lut_reply_data);
641 lut->lut_reply_data = NULL;
642 if (lut->lut_reply_bitmap != NULL) {
643 for (i = 0; i < LUT_REPLY_SLOTS_MAX_CHUNKS; i++) {
644 if (lut->lut_reply_bitmap[i] != NULL)
645 OBD_FREE_LARGE(lut->lut_reply_bitmap[i],
646 BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
648 lut->lut_reply_bitmap[i] = NULL;
650 OBD_FREE(lut->lut_reply_bitmap,
651 LUT_REPLY_SLOTS_MAX_CHUNKS * sizeof(unsigned long *));
653 lut->lut_reply_bitmap = NULL;
656 EXPORT_SYMBOL(tgt_init);
658 void tgt_fini(const struct lu_env *env, struct lu_target *lut)
664 if (lut->lut_lsd.lsd_feature_incompat & OBD_INCOMPAT_MULTI_RPCS &&
665 atomic_read(&lut->lut_num_clients) == 0) {
666 /* Clear MULTI RPCS incompatibility flag that prevents previous
667 * Lustre versions to mount a target with reply_data file */
668 lut->lut_lsd.lsd_feature_incompat &= ~OBD_INCOMPAT_MULTI_RPCS;
669 rc = tgt_server_data_update(env, lut, 1);
671 CERROR("%s: unable to clear MULTI RPCS "
672 "incompatibility flag\n",
673 lut->lut_obd->obd_name);
676 sptlrpc_rule_set_free(&lut->lut_sptlrpc_rset);
678 if (lut->lut_reply_data != NULL)
679 dt_object_put(env, lut->lut_reply_data);
680 lut->lut_reply_data = NULL;
681 if (lut->lut_reply_bitmap != NULL) {
682 for (i = 0; i < LUT_REPLY_SLOTS_MAX_CHUNKS; i++) {
683 if (lut->lut_reply_bitmap[i] != NULL)
684 OBD_FREE_LARGE(lut->lut_reply_bitmap[i],
685 BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
687 lut->lut_reply_bitmap[i] = NULL;
689 OBD_FREE(lut->lut_reply_bitmap,
690 LUT_REPLY_SLOTS_MAX_CHUNKS * sizeof(unsigned long *));
692 lut->lut_reply_bitmap = NULL;
693 if (lut->lut_client_bitmap) {
694 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
695 lut->lut_client_bitmap = NULL;
697 if (lut->lut_last_rcvd) {
698 dt_txn_callback_del(lut->lut_bottom, &lut->lut_txn_cb);
699 dt_object_put(env, lut->lut_last_rcvd);
700 lut->lut_last_rcvd = NULL;
704 EXPORT_SYMBOL(tgt_fini);
706 static struct kmem_cache *tgt_thread_kmem;
707 static struct kmem_cache *tgt_session_kmem;
708 struct kmem_cache *tgt_fmd_kmem;
710 static struct lu_kmem_descr tgt_caches[] = {
712 .ckd_cache = &tgt_thread_kmem,
713 .ckd_name = "tgt_thread_kmem",
714 .ckd_size = sizeof(struct tgt_thread_info),
717 .ckd_cache = &tgt_session_kmem,
718 .ckd_name = "tgt_session_kmem",
719 .ckd_size = sizeof(struct tgt_session_info)
722 .ckd_cache = &tgt_fmd_kmem,
723 .ckd_name = "tgt_fmd_cache",
724 .ckd_size = sizeof(struct tgt_fmd_data)
732 /* context key constructor/destructor: tg_key_init, tg_key_fini */
733 static void *tgt_key_init(const struct lu_context *ctx,
734 struct lu_context_key *key)
736 struct tgt_thread_info *thread;
738 OBD_SLAB_ALLOC_PTR_GFP(thread, tgt_thread_kmem, GFP_NOFS);
740 return ERR_PTR(-ENOMEM);
745 static void tgt_key_fini(const struct lu_context *ctx,
746 struct lu_context_key *key, void *data)
748 struct tgt_thread_info *info = data;
749 struct thandle_exec_args *args = &info->tti_tea;
752 for (i = 0; i < args->ta_alloc_args; i++) {
753 if (args->ta_args[i] != NULL)
754 OBD_FREE_PTR(args->ta_args[i]);
757 if (args->ta_args != NULL)
758 OBD_FREE_PTR_ARRAY(args->ta_args, args->ta_alloc_args);
759 OBD_SLAB_FREE_PTR(info, tgt_thread_kmem);
762 /* context key: tg_thread_key */
763 struct lu_context_key tgt_thread_key = {
764 .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD,
765 .lct_init = tgt_key_init,
766 .lct_fini = tgt_key_fini,
769 LU_KEY_INIT_GENERIC(tgt);
771 static void *tgt_ses_key_init(const struct lu_context *ctx,
772 struct lu_context_key *key)
774 struct tgt_session_info *session;
776 OBD_SLAB_ALLOC_PTR_GFP(session, tgt_session_kmem, GFP_NOFS);
778 return ERR_PTR(-ENOMEM);
783 static void tgt_ses_key_fini(const struct lu_context *ctx,
784 struct lu_context_key *key, void *data)
786 struct tgt_session_info *session = data;
788 OBD_SLAB_FREE_PTR(session, tgt_session_kmem);
791 static void tgt_ses_key_exit(const struct lu_context *ctx,
792 struct lu_context_key *key, void *data)
794 struct tgt_session_info *tsi = data;
797 * Check cases when that is true to add proper
798 * handling and set mult_trans
800 if (!tsi->tsi_mult_trans && tsi->tsi_has_trans > 1)
801 CDEBUG(D_HA, "total %i transactions per RPC\n",
803 tsi->tsi_has_trans = 0;
804 tsi->tsi_mult_trans = false;
807 /* context key: tgt_session_key */
808 struct lu_context_key tgt_session_key = {
809 .lct_tags = LCT_SERVER_SESSION,
810 .lct_init = tgt_ses_key_init,
811 .lct_fini = tgt_ses_key_fini,
812 .lct_exit = tgt_ses_key_exit,
814 EXPORT_SYMBOL(tgt_session_key);
816 LU_KEY_INIT_GENERIC(tgt_ses);
819 * this page is allocated statically when module is initializing
820 * it is used to simulate data corruptions, see ost_checksum_bulk()
821 * for details. as the original pages provided by the layers below
822 * can be remain in the internal cache, we do not want to modify
825 struct page *tgt_page_to_corrupt;
827 int tgt_mod_init(void)
832 result = lu_kmem_init(tgt_caches);
836 result = lustre_tgt_register_fs();
838 lu_kmem_fini(tgt_caches);
842 tgt_page_to_corrupt = alloc_page(GFP_KERNEL);
844 tgt_key_init_generic(&tgt_thread_key, NULL);
845 lu_context_key_register_many(&tgt_thread_key, NULL);
847 tgt_ses_key_init_generic(&tgt_session_key, NULL);
848 lu_context_key_register_many(&tgt_session_key, NULL);
856 void tgt_mod_exit(void)
859 if (tgt_page_to_corrupt != NULL)
860 put_page(tgt_page_to_corrupt);
862 lu_context_key_degister(&tgt_thread_key);
863 lu_context_key_degister(&tgt_session_key);
866 lustre_tgt_unregister_fs();
868 lu_kmem_fini(tgt_caches);