Whamcloud - gitweb
LU-17662 osd-zfs: Support for ZFS 2.2.3
[fs/lustre-release.git] / lustre / target / tgt_main.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; if not, write to the
18  * Free Software Foundation, Inc., 59 Temple Place - Suite 330,
19  * Boston, MA 021110-1307, USA
20  *
21  * GPL HEADER END
22  */
23 /*
24  * Copyright (c) 2012, 2017, Intel Corporation.
25  */
26 /*
27  * lustre/target/tgt_main.c
28  *
29  * Lustre Unified Target main initialization code
30  *
31  * Author: Mikhail Pershin <mike.pershin@intel.com>
32  */
33
34 #define DEBUG_SUBSYSTEM S_CLASS
35
36 #include <obd.h>
37 #include <obd_target.h>
38 #include <obd_cksum.h>
39 #include "tgt_internal.h"
40 #include "../ptlrpc/ptlrpc_internal.h"
41
42 /* This must be longer than the longest string below */
43 #define SYNC_STATES_MAXLEN 16
44 static const char * const sync_lock_cancel_states[] = {
45         [SYNC_LOCK_CANCEL_NEVER]        = "never",
46         [SYNC_LOCK_CANCEL_BLOCKING]     = "blocking",
47         [SYNC_LOCK_CANCEL_ALWAYS]       = "always",
48 };
49
50 /**
51  * Show policy for handling dirty data under a lock being cancelled.
52  *
53  * \param[in] kobj      sysfs kobject
54  * \param[in] attr      sysfs attribute
55  * \param[in] buf       buffer for data
56  *
57  * \retval              0 and buffer filled with data on success
58  * \retval              negative value on error
59  */
60 ssize_t sync_lock_cancel_show(struct kobject *kobj,
61                               struct attribute *attr, char *buf)
62 {
63         struct obd_device *obd = container_of(kobj, struct obd_device,
64                                               obd_kset.kobj);
65         struct lu_target *tgt = obd2obt(obd)->obt_lut;
66
67         return sprintf(buf, "%s\n",
68                        sync_lock_cancel_states[tgt->lut_sync_lock_cancel]);
69 }
70 EXPORT_SYMBOL(sync_lock_cancel_show);
71
72 /**
73  * Change policy for handling dirty data under a lock being cancelled.
74  *
75  * This variable defines what action target takes upon lock cancel
76  * There are three possible modes:
77  * 1) never - never do sync upon lock cancel. This can lead to data
78  *    inconsistencies if both the OST and client crash while writing a file
79  *    that is also concurrently being read by another client. In these cases,
80  *    this may allow the file data to "rewind" to an earlier state.
81  * 2) blocking - do sync only if there is blocking lock, e.g. if another
82  *    client is trying to access this same object
83  * 3) always - do sync always
84  *
85  * \param[in] kobj      kobject
86  * \param[in] attr      attribute to show
87  * \param[in] buf       buffer for data
88  * \param[in] count     buffer size
89  *
90  * \retval              \a count on success
91  * \retval              negative value on error
92  */
93 ssize_t sync_lock_cancel_store(struct kobject *kobj, struct attribute *attr,
94                                const char *buffer, size_t count)
95 {
96         struct obd_device *obd = container_of(kobj, struct obd_device,
97                                               obd_kset.kobj);
98         struct lu_target *tgt = obd2obt(obd)->obt_lut;
99         int val = -1;
100         enum tgt_sync_lock_cancel slc;
101
102         if (count == 0 || count >= SYNC_STATES_MAXLEN)
103                 return -EINVAL;
104
105         for (slc = 0; slc < ARRAY_SIZE(sync_lock_cancel_states); slc++) {
106                 if (strcmp(buffer, sync_lock_cancel_states[slc]) == 0) {
107                         val = slc;
108                         break;
109                 }
110         }
111
112         /* Legacy numeric codes */
113         if (val == -1) {
114                 int rc = kstrtoint(buffer, 0, &val);
115                 if (rc)
116                         return rc;
117         }
118
119         if (val < 0 || val > 2)
120                 return -EINVAL;
121
122         spin_lock(&tgt->lut_flags_lock);
123         tgt->lut_sync_lock_cancel = val;
124         spin_unlock(&tgt->lut_flags_lock);
125         return count;
126 }
127 EXPORT_SYMBOL(sync_lock_cancel_store);
128 LUSTRE_RW_ATTR(sync_lock_cancel);
129
130 /**
131  * Show maximum number of Filter Modification Data (FMD) maintained.
132  *
133  * \param[in] kobj      kobject
134  * \param[in] attr      attribute to show
135  * \param[in] buf       buffer for data
136  *
137  * \retval              0 and buffer filled with data on success
138  * \retval              negative value on error
139  */
140 ssize_t tgt_fmd_count_show(struct kobject *kobj, struct attribute *attr,
141                            char *buf)
142 {
143         struct obd_device *obd = container_of(kobj, struct obd_device,
144                                               obd_kset.kobj);
145         struct lu_target *lut = obd2obt(obd)->obt_lut;
146
147         return sprintf(buf, "%u\n", lut->lut_fmd_max_num);
148 }
149
150 /**
151  * Change number of FMDs maintained by target.
152  *
153  * This defines how large the list of FMDs can be.
154  *
155  * \param[in] kobj      kobject
156  * \param[in] attr      attribute to show
157  * \param[in] buf       buffer for data
158  * \param[in] count     buffer size
159  *
160  * \retval              \a count on success
161  * \retval              negative value on error
162  */
163 ssize_t tgt_fmd_count_store(struct kobject *kobj, struct attribute *attr,
164                             const char *buffer, size_t count)
165 {
166         struct obd_device *obd = container_of(kobj, struct obd_device,
167                                               obd_kset.kobj);
168         struct lu_target *lut = obd2obt(obd)->obt_lut;
169         int val, rc;
170
171         rc = kstrtoint(buffer, 0, &val);
172         if (rc)
173                 return rc;
174
175         if (val < 1 || val > 65536)
176                 return -EINVAL;
177
178         lut->lut_fmd_max_num = val;
179
180         return count;
181 }
182 LUSTRE_RW_ATTR(tgt_fmd_count);
183
184 /**
185  * Show the maximum age of FMD data in seconds.
186  *
187  * \param[in] kobj      kobject
188  * \param[in] attr      attribute to show
189  * \param[in] buf       buffer for data
190  *
191  * \retval              0 and buffer filled with data on success
192  * \retval              negative value on error
193  */
194 ssize_t tgt_fmd_seconds_show(struct kobject *kobj, struct attribute *attr,
195                              char *buf)
196 {
197         struct obd_device *obd = container_of(kobj, struct obd_device,
198                                               obd_kset.kobj);
199         struct lu_target *lut = obd2obt(obd)->obt_lut;
200
201         return sprintf(buf, "%lld\n", lut->lut_fmd_max_age);
202 }
203
204 /**
205  * Set the maximum age of FMD data in seconds.
206  *
207  * This defines how long FMD data stays in the FMD list.
208  *
209  * \param[in] kobj      kobject
210  * \param[in] attr      attribute to show
211  * \param[in] buf       buffer for data
212  * \param[in] count     buffer size
213  *
214  * \retval              \a count on success
215  * \retval              negative number on error
216  */
217 ssize_t tgt_fmd_seconds_store(struct kobject *kobj, struct attribute *attr,
218                               const char *buffer, size_t count)
219 {
220         struct obd_device *obd = container_of(kobj, struct obd_device,
221                                               obd_kset.kobj);
222         struct lu_target *lut = obd2obt(obd)->obt_lut;
223         time64_t val;
224         int rc;
225
226         rc = kstrtoll(buffer, 0, &val);
227         if (rc)
228                 return rc;
229
230         if (val < 1 || val > 65536) /* ~ 18 hour max */
231                 return -EINVAL;
232
233         lut->lut_fmd_max_age = val;
234
235         return count;
236 }
237 LUSTRE_RW_ATTR(tgt_fmd_seconds);
238
239 /* These two aliases are old names and kept for compatibility, they were
240  * changed to 'tgt_fmd_count' and 'tgt_fmd_seconds'.
241  * This change was made in Lustre 2.13, so these aliases can be removed
242  * when back compatibility is not needed with any Lustre version prior 2.13
243  */
244 static struct lustre_attr tgt_fmd_count_compat = __ATTR(client_cache_count,
245                         0644, tgt_fmd_count_show, tgt_fmd_count_store);
246 static struct lustre_attr tgt_fmd_seconds_compat = __ATTR(client_cache_seconds,
247                         0644, tgt_fmd_seconds_show, tgt_fmd_seconds_store);
248
249 static const struct attribute *tgt_attrs[] = {
250         &lustre_attr_sync_lock_cancel.attr,
251         &lustre_attr_tgt_fmd_count.attr,
252         &lustre_attr_tgt_fmd_seconds.attr,
253         &tgt_fmd_count_compat.attr,
254         &tgt_fmd_seconds_compat.attr,
255         NULL,
256 };
257
258 /**
259  * Decide which checksums both client and OST support, possibly forcing
260  * the use of T10PI checksums if the hardware supports this.
261  *
262  * The clients that have no T10-PI RPC checksum support will use the same
263  * mechanism to select checksum type as before, and will not be affected by
264  * the following logic.
265  *
266  * For the clients that have T10-PI RPC checksum support:
267  *
268  * If the target supports T10-PI feature and T10-PI checksum is enforced,
269  * clients will have no other choice for RPC checksum type other than using
270  * the T10PI checksum type. This is useful for enforcing end-to-end integrity
271  * in the whole system.
272  *
273  * If the target doesn't support T10-PI feature and T10-PI checksum is
274  * enforced, together with other checksum with reasonably good speeds (e.g.
275  * crc32, crc32c, adler, etc.), all T10-PI checksum types understood by the
276  * client (t10ip512, t10ip4K, t10crc512, t10crc4K) will be added to the
277  * available checksum types, regardless of the speeds of T10-PI checksums.
278  * This is useful for testing T10-PI checksum of RPC.
279  *
280  * If the target supports T10-PI feature and T10-PI checksum is NOT enforced,
281  * the corresponding T10-PI checksum type will be added to the checksum type
282  * list, regardless of the speed of the T10-PI checksum. This provides clients
283  * the flexibility to choose whether to enable end-to-end integrity or not.
284  *
285  * If the target does NOT supports T10-PI feature and T10-PI checksum is NOT
286  * enforced, together with other checksums with reasonably good speeds,
287  * all the T10-PI checksum types with good speeds will be added into the
288  * checksum type list. Note that a T10-PI checksum type with a speed worse
289  * than half of Alder will NOT be added as a option. In this circumstance,
290  * T10-PI checksum types has the same behavior like other normal checksum
291  * types.
292  */
293 void tgt_mask_cksum_types(struct lu_target *lut, enum cksum_types *cksum_types)
294 {
295         bool enforce = lut->lut_cksum_t10pi_enforce;
296         enum cksum_types tgt_t10_cksum_type;
297         enum cksum_types client_t10_types = *cksum_types & OBD_CKSUM_T10_ALL;
298         enum cksum_types server_t10_types;
299
300         /*
301          * The client set in ocd_cksum_types the checksum types it
302          * supports. We have to mask off the algorithms that we don't
303          * support. T10PI checksum types will be added later.
304          */
305         *cksum_types &= (lut->lut_cksum_types_supported & ~OBD_CKSUM_T10_ALL);
306         server_t10_types = lut->lut_cksum_types_supported & OBD_CKSUM_T10_ALL;
307         tgt_t10_cksum_type = lut->lut_dt_conf.ddp_t10_cksum_type;
308
309         /* Quick exit if no T10-PI support on client */
310         if (!client_t10_types)
311                 return;
312
313         /*
314          * This OST has NO T10-PI feature. Add all supported T10-PI checksums
315          * as options if T10-PI checksum is enforced. If the T10-PI checksum is
316          * not enforced, only add them as options when speed is good.
317          */
318         if (tgt_t10_cksum_type == 0) {
319                 /*
320                  * Server allows all T10PI checksums, and server_t10_types
321                  * include quick ones.
322                  */
323                 if (enforce)
324                         *cksum_types |= client_t10_types;
325                 else
326                         *cksum_types |= client_t10_types & server_t10_types;
327                 return;
328         }
329
330         /*
331          * This OST has T10-PI feature. Disable all other checksum types if
332          * T10-PI checksum is enforced. If the T10-PI checksum is not enforced,
333          * add the checksum type as an option.
334          */
335         if (client_t10_types & tgt_t10_cksum_type) {
336                 if (enforce)
337                         *cksum_types = tgt_t10_cksum_type;
338                 else
339                         *cksum_types |= tgt_t10_cksum_type;
340         }
341 }
342 EXPORT_SYMBOL(tgt_mask_cksum_types);
343
344 int tgt_tunables_init(struct lu_target *lut)
345 {
346         int rc;
347
348         rc = sysfs_create_files(&lut->lut_obd->obd_kset.kobj, tgt_attrs);
349         if (!rc)
350                 lut->lut_attrs = tgt_attrs;
351         return rc;
352 }
353 EXPORT_SYMBOL(tgt_tunables_init);
354
355 void tgt_tunables_fini(struct lu_target *lut)
356 {
357         if (lut->lut_attrs) {
358                 sysfs_remove_files(&lut->lut_obd->obd_kset.kobj,
359                                    lut->lut_attrs);
360                 lut->lut_attrs = NULL;
361         }
362 }
363 EXPORT_SYMBOL(tgt_tunables_fini);
364
365 /*
366  * Save cross-MDT lock in lut_slc_locks.
367  *
368  * Lock R/W count is not saved, but released in unlock (not canceled remotely),
369  * instead only a refcount is taken, so that the remote MDT where the object
370  * resides can detect conflict with this lock there.
371  *
372  * \param lut target
373  * \param lock cross-MDT lock to save
374  * \param transno when the transaction with this transno is committed, this lock
375  *                can be canceled.
376  */
377 void tgt_save_slc_lock(struct lu_target *lut, struct ldlm_lock *lock,
378                        __u64 transno)
379 {
380         spin_lock(&lut->lut_slc_locks_guard);
381         lock_res_and_lock(lock);
382         if (ldlm_is_cbpending(lock)) {
383                 /* if it was canceld by server, don't save, because remote MDT
384                  * will do Sync-on-Cancel. */
385                 LDLM_LOCK_PUT(lock);
386         } else {
387                 lock->l_transno = transno;
388                 /* if this lock is in the list already, there are two operations
389                  * both use this lock, and save it after use, so for the second
390                  * one, just put the refcount. */
391                 if (list_empty(&lock->l_slc_link))
392                         list_add_tail(&lock->l_slc_link, &lut->lut_slc_locks);
393                 else
394                         LDLM_LOCK_PUT(lock);
395         }
396         unlock_res_and_lock(lock);
397         spin_unlock(&lut->lut_slc_locks_guard);
398 }
399 EXPORT_SYMBOL(tgt_save_slc_lock);
400
401 /*
402  * Discard cross-MDT lock from lut_slc_locks.
403  *
404  * This is called upon BAST, just remove lock from lut_slc_locks and put lock
405  * refcount. The BAST will cancel this lock.
406  *
407  * \param lut target
408  * \param lock cross-MDT lock to discard
409  */
410 void tgt_discard_slc_lock(struct lu_target *lut, struct ldlm_lock *lock)
411 {
412         spin_lock(&lut->lut_slc_locks_guard);
413         lock_res_and_lock(lock);
414         /* may race with tgt_cancel_slc_locks() */
415         if (lock->l_transno != 0) {
416                 LASSERT(!list_empty(&lock->l_slc_link));
417                 LASSERT(ldlm_is_cbpending(lock));
418                 list_del_init(&lock->l_slc_link);
419                 lock->l_transno = 0;
420                 LDLM_LOCK_PUT(lock);
421         }
422         unlock_res_and_lock(lock);
423         spin_unlock(&lut->lut_slc_locks_guard);
424 }
425 EXPORT_SYMBOL(tgt_discard_slc_lock);
426
427 /*
428  * Cancel cross-MDT locks upon transaction commit.
429  *
430  * Remove cross-MDT locks from lut_slc_locks, cancel them and put lock refcount.
431  *
432  * \param lut target
433  * \param transno transaction with this number was committed.
434  */
435 void tgt_cancel_slc_locks(struct lu_target *lut, __u64 transno)
436 {
437         struct ldlm_lock *lock, *next;
438         LIST_HEAD(list);
439         struct lustre_handle lockh;
440
441         spin_lock(&lut->lut_slc_locks_guard);
442         list_for_each_entry_safe(lock, next, &lut->lut_slc_locks,
443                                  l_slc_link) {
444                 lock_res_and_lock(lock);
445                 LASSERT(lock->l_transno != 0);
446                 if (lock->l_transno > transno) {
447                         unlock_res_and_lock(lock);
448                         continue;
449                 }
450                 /* ouch, another operation is using it after it's saved */
451                 if (lock->l_readers != 0 || lock->l_writers != 0) {
452                         unlock_res_and_lock(lock);
453                         continue;
454                 }
455                 /* set CBPENDING so that this lock won't be used again */
456                 ldlm_set_cbpending(lock);
457                 lock->l_transno = 0;
458                 list_move(&lock->l_slc_link, &list);
459                 unlock_res_and_lock(lock);
460         }
461         spin_unlock(&lut->lut_slc_locks_guard);
462
463         list_for_each_entry_safe(lock, next, &list, l_slc_link) {
464                 list_del_init(&lock->l_slc_link);
465                 ldlm_lock2handle(lock, &lockh);
466                 ldlm_cli_cancel(&lockh, LCF_ASYNC);
467                 LDLM_LOCK_PUT(lock);
468         }
469 }
470
471 int tgt_init(const struct lu_env *env, struct lu_target *lut,
472              struct obd_device *obd, struct dt_device *dt,
473              struct tgt_opc_slice *slice, int request_fail_id,
474              int reply_fail_id)
475 {
476         struct dt_object_format  dof;
477         struct lu_attr           attr;
478         struct lu_fid            fid;
479         struct dt_object        *o;
480         struct tg_grants_data   *tgd = &lut->lut_tgd;
481         struct obd_statfs       *osfs;
482         struct obd_device_target *obt;
483         int i, rc = 0;
484
485         ENTRY;
486
487         LASSERT(lut);
488         LASSERT(obd);
489         lut->lut_obd = obd;
490         lut->lut_bottom = dt;
491         lut->lut_last_rcvd = NULL;
492         lut->lut_client_bitmap = NULL;
493         atomic_set(&lut->lut_num_clients, 0);
494         atomic_set(&lut->lut_client_generation, 0);
495         lut->lut_reply_data = NULL;
496         lut->lut_reply_bitmap = NULL;
497         obt = obd_obt_init(obd);
498         obt->obt_lut = lut;
499
500         /* set request handler slice and parameters */
501         lut->lut_slice = slice;
502         lut->lut_reply_fail_id = reply_fail_id;
503         lut->lut_request_fail_id = request_fail_id;
504
505         /* sptlrcp variables init */
506         rwlock_init(&lut->lut_sptlrpc_lock);
507         sptlrpc_rule_set_init(&lut->lut_sptlrpc_rset);
508
509         spin_lock_init(&lut->lut_flags_lock);
510         lut->lut_sync_lock_cancel = SYNC_LOCK_CANCEL_NEVER;
511         lut->lut_cksum_t10pi_enforce = 0;
512         lut->lut_cksum_types_supported =
513                 obd_cksum_types_supported_server(obd->obd_name);
514
515         spin_lock_init(&lut->lut_slc_locks_guard);
516         INIT_LIST_HEAD(&lut->lut_slc_locks);
517
518         /* last_rcvd initialization is needed by replayable targets only */
519         if (!obd->obd_replayable)
520                 RETURN(0);
521
522         /* initialize grant and statfs data in target */
523         dt_conf_get(env, lut->lut_bottom, &lut->lut_dt_conf);
524
525         /* statfs data */
526         spin_lock_init(&tgd->tgd_osfs_lock);
527         tgd->tgd_osfs_age = ktime_get_seconds() - 1000;
528         tgd->tgd_osfs_unstable = 0;
529         tgd->tgd_statfs_inflight = 0;
530         tgd->tgd_osfs_inflight = 0;
531
532         /* grant data */
533         spin_lock_init(&tgd->tgd_grant_lock);
534         tgd->tgd_tot_dirty = 0;
535         tgd->tgd_tot_granted = 0;
536         tgd->tgd_tot_pending = 0;
537         tgd->tgd_grant_compat_disable = 0;
538
539         /* populate cached statfs data */
540         osfs = &tgt_th_info(env)->tti_u.osfs;
541         rc = tgt_statfs_internal(env, lut, osfs, 0, NULL);
542         if (rc != 0) {
543                 CERROR("%s: can't get statfs data, rc %d\n", tgt_name(lut),
544                         rc);
545                 GOTO(out, rc);
546         }
547         if (!is_power_of_2(osfs->os_bsize)) {
548                 CERROR("%s: blocksize (%d) is not a power of 2\n",
549                         tgt_name(lut), osfs->os_bsize);
550                 GOTO(out, rc = -EPROTO);
551         }
552         tgd->tgd_blockbits = fls(osfs->os_bsize) - 1;
553
554         spin_lock_init(&lut->lut_translock);
555         spin_lock_init(&lut->lut_client_bitmap_lock);
556
557         OBD_ALLOC(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
558         if (lut->lut_client_bitmap == NULL)
559                 RETURN(-ENOMEM);
560
561         memset(&attr, 0, sizeof(attr));
562         attr.la_valid = LA_MODE;
563         attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
564         dof.dof_type = dt_mode_to_dft(S_IFREG);
565
566         lu_local_obj_fid(&fid, LAST_RECV_OID);
567
568         o = dt_find_or_create(env, lut->lut_bottom, &fid, &dof, &attr);
569         if (IS_ERR(o)) {
570                 rc = PTR_ERR(o);
571                 CERROR("%s: cannot open LAST_RCVD: rc = %d\n", tgt_name(lut),
572                        rc);
573                 GOTO(out_put, rc);
574         }
575
576         lut->lut_last_rcvd = o;
577         rc = tgt_server_data_init(env, lut);
578         if (rc < 0)
579                 GOTO(out_put, rc);
580
581         /* prepare transactions callbacks */
582         lut->lut_txn_cb.dtc_txn_start = tgt_txn_start_cb;
583         lut->lut_txn_cb.dtc_txn_stop = tgt_txn_stop_cb;
584         lut->lut_txn_cb.dtc_cookie = lut;
585         lut->lut_txn_cb.dtc_tag = LCT_DT_THREAD | LCT_MD_THREAD;
586         INIT_LIST_HEAD(&lut->lut_txn_cb.dtc_linkage);
587
588         dt_txn_callback_add(lut->lut_bottom, &lut->lut_txn_cb);
589         lut->lut_bottom->dd_lu_dev.ld_site->ls_tgt = lut;
590
591         lut->lut_fmd_max_num = LUT_FMD_MAX_NUM_DEFAULT;
592         lut->lut_fmd_max_age = LUT_FMD_MAX_AGE_DEFAULT;
593
594         atomic_set(&lut->lut_sync_count, 0);
595
596         /* reply_data is supported by MDT targets only for now */
597         if (strncmp(obd->obd_type->typ_name, LUSTRE_MDT_NAME, 3) != 0)
598                 RETURN(0);
599
600         OBD_ALLOC(lut->lut_reply_bitmap,
601                   LUT_REPLY_SLOTS_MAX_CHUNKS * sizeof(unsigned long *));
602         if (lut->lut_reply_bitmap == NULL)
603                 GOTO(out, rc = -ENOMEM);
604
605         memset(&attr, 0, sizeof(attr));
606         attr.la_valid = LA_MODE;
607         attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
608         dof.dof_type = dt_mode_to_dft(S_IFREG);
609
610         lu_local_obj_fid(&fid, REPLY_DATA_OID);
611
612         o = dt_find_or_create(env, lut->lut_bottom, &fid, &dof, &attr);
613         if (IS_ERR(o)) {
614                 rc = PTR_ERR(o);
615                 CERROR("%s: cannot open REPLY_DATA: rc = %d\n", tgt_name(lut),
616                        rc);
617                 GOTO(out, rc);
618         }
619         lut->lut_reply_data = o;
620
621         rc = tgt_reply_data_init(env, lut);
622         if (rc < 0)
623                 GOTO(out, rc);
624
625         RETURN(0);
626
627 out:
628         dt_txn_callback_del(lut->lut_bottom, &lut->lut_txn_cb);
629 out_put:
630         obd2obt(obd)->obt_lut = NULL;
631         obd2obt(obd)->obt_magic = 0;
632         if (lut->lut_last_rcvd != NULL) {
633                 dt_object_put(env, lut->lut_last_rcvd);
634                 lut->lut_last_rcvd = NULL;
635         }
636         if (lut->lut_client_bitmap != NULL)
637                 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
638         lut->lut_client_bitmap = NULL;
639         if (lut->lut_reply_data != NULL)
640                 dt_object_put(env, lut->lut_reply_data);
641         lut->lut_reply_data = NULL;
642         if (lut->lut_reply_bitmap != NULL) {
643                 for (i = 0; i < LUT_REPLY_SLOTS_MAX_CHUNKS; i++) {
644                         if (lut->lut_reply_bitmap[i] != NULL)
645                                 OBD_FREE_LARGE(lut->lut_reply_bitmap[i],
646                                     BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
647                                     sizeof(long));
648                         lut->lut_reply_bitmap[i] = NULL;
649                 }
650                 OBD_FREE(lut->lut_reply_bitmap,
651                          LUT_REPLY_SLOTS_MAX_CHUNKS * sizeof(unsigned long *));
652         }
653         lut->lut_reply_bitmap = NULL;
654         return rc;
655 }
656 EXPORT_SYMBOL(tgt_init);
657
658 void tgt_fini(const struct lu_env *env, struct lu_target *lut)
659 {
660         int i;
661         int rc;
662         ENTRY;
663
664         if (lut->lut_lsd.lsd_feature_incompat & OBD_INCOMPAT_MULTI_RPCS &&
665             atomic_read(&lut->lut_num_clients) == 0) {
666                 /* Clear MULTI RPCS incompatibility flag that prevents previous
667                  * Lustre versions to mount a target with reply_data file */
668                 lut->lut_lsd.lsd_feature_incompat &= ~OBD_INCOMPAT_MULTI_RPCS;
669                 rc = tgt_server_data_update(env, lut, 1);
670                 if (rc < 0)
671                         CERROR("%s: unable to clear MULTI RPCS "
672                                "incompatibility flag\n",
673                                lut->lut_obd->obd_name);
674         }
675
676         sptlrpc_rule_set_free(&lut->lut_sptlrpc_rset);
677
678         if (lut->lut_reply_data != NULL)
679                 dt_object_put(env, lut->lut_reply_data);
680         lut->lut_reply_data = NULL;
681         if (lut->lut_reply_bitmap != NULL) {
682                 for (i = 0; i < LUT_REPLY_SLOTS_MAX_CHUNKS; i++) {
683                         if (lut->lut_reply_bitmap[i] != NULL)
684                                 OBD_FREE_LARGE(lut->lut_reply_bitmap[i],
685                                     BITS_TO_LONGS(LUT_REPLY_SLOTS_PER_CHUNK) *
686                                     sizeof(long));
687                         lut->lut_reply_bitmap[i] = NULL;
688                 }
689                 OBD_FREE(lut->lut_reply_bitmap,
690                          LUT_REPLY_SLOTS_MAX_CHUNKS * sizeof(unsigned long *));
691         }
692         lut->lut_reply_bitmap = NULL;
693         if (lut->lut_client_bitmap) {
694                 OBD_FREE(lut->lut_client_bitmap, LR_MAX_CLIENTS >> 3);
695                 lut->lut_client_bitmap = NULL;
696         }
697         if (lut->lut_last_rcvd) {
698                 dt_txn_callback_del(lut->lut_bottom, &lut->lut_txn_cb);
699                 dt_object_put(env, lut->lut_last_rcvd);
700                 lut->lut_last_rcvd = NULL;
701         }
702         EXIT;
703 }
704 EXPORT_SYMBOL(tgt_fini);
705
706 static struct kmem_cache *tgt_thread_kmem;
707 static struct kmem_cache *tgt_session_kmem;
708 struct kmem_cache *tgt_fmd_kmem;
709
710 static struct lu_kmem_descr tgt_caches[] = {
711         {
712                 .ckd_cache = &tgt_thread_kmem,
713                 .ckd_name  = "tgt_thread_kmem",
714                 .ckd_size  = sizeof(struct tgt_thread_info),
715         },
716         {
717                 .ckd_cache = &tgt_session_kmem,
718                 .ckd_name  = "tgt_session_kmem",
719                 .ckd_size  = sizeof(struct tgt_session_info)
720         },
721         {
722                 .ckd_cache = &tgt_fmd_kmem,
723                 .ckd_name  = "tgt_fmd_cache",
724                 .ckd_size  = sizeof(struct tgt_fmd_data)
725         },
726         {
727                 .ckd_cache = NULL
728         }
729 };
730
731
732 /* context key constructor/destructor: tg_key_init, tg_key_fini */
733 static void *tgt_key_init(const struct lu_context *ctx,
734                                   struct lu_context_key *key)
735 {
736         struct tgt_thread_info *thread;
737
738         OBD_SLAB_ALLOC_PTR_GFP(thread, tgt_thread_kmem, GFP_NOFS);
739         if (thread == NULL)
740                 return ERR_PTR(-ENOMEM);
741
742         return thread;
743 }
744
745 static void tgt_key_fini(const struct lu_context *ctx,
746                          struct lu_context_key *key, void *data)
747 {
748         struct tgt_thread_info          *info = data;
749         struct thandle_exec_args        *args = &info->tti_tea;
750         int                             i;
751
752         for (i = 0; i < args->ta_alloc_args; i++) {
753                 if (args->ta_args[i] != NULL)
754                         OBD_FREE_PTR(args->ta_args[i]);
755         }
756
757         if (args->ta_args != NULL)
758                 OBD_FREE_PTR_ARRAY(args->ta_args, args->ta_alloc_args);
759         OBD_SLAB_FREE_PTR(info, tgt_thread_kmem);
760 }
761
762 /* context key: tg_thread_key */
763 struct lu_context_key tgt_thread_key = {
764         .lct_tags = LCT_MD_THREAD | LCT_DT_THREAD,
765         .lct_init = tgt_key_init,
766         .lct_fini = tgt_key_fini,
767 };
768
769 LU_KEY_INIT_GENERIC(tgt);
770
771 static void *tgt_ses_key_init(const struct lu_context *ctx,
772                               struct lu_context_key *key)
773 {
774         struct tgt_session_info *session;
775
776         OBD_SLAB_ALLOC_PTR_GFP(session, tgt_session_kmem, GFP_NOFS);
777         if (session == NULL)
778                 return ERR_PTR(-ENOMEM);
779
780         return session;
781 }
782
783 static void tgt_ses_key_fini(const struct lu_context *ctx,
784                              struct lu_context_key *key, void *data)
785 {
786         struct tgt_session_info *session = data;
787
788         OBD_SLAB_FREE_PTR(session, tgt_session_kmem);
789 }
790
791 static void tgt_ses_key_exit(const struct lu_context *ctx,
792                              struct lu_context_key *key, void *data)
793 {
794         struct tgt_session_info *tsi = data;
795
796         /**
797          * Check cases when that is true to add proper
798          * handling and set mult_trans
799          */
800         if (!tsi->tsi_mult_trans && tsi->tsi_has_trans > 1)
801                 CDEBUG(D_HA, "total %i transactions per RPC\n",
802                        tsi->tsi_has_trans);
803         tsi->tsi_has_trans = 0;
804         tsi->tsi_mult_trans = false;
805 }
806
807 /* context key: tgt_session_key */
808 struct lu_context_key tgt_session_key = {
809         .lct_tags = LCT_SERVER_SESSION,
810         .lct_init = tgt_ses_key_init,
811         .lct_fini = tgt_ses_key_fini,
812         .lct_exit = tgt_ses_key_exit,
813 };
814 EXPORT_SYMBOL(tgt_session_key);
815
816 LU_KEY_INIT_GENERIC(tgt_ses);
817
818 /*
819  * this page is allocated statically when module is initializing
820  * it is used to simulate data corruptions, see ost_checksum_bulk()
821  * for details. as the original pages provided by the layers below
822  * can be remain in the internal cache, we do not want to modify
823  * them.
824  */
825 struct page *tgt_page_to_corrupt;
826
827 int tgt_mod_init(void)
828 {
829         int     result;
830         ENTRY;
831
832         result = lu_kmem_init(tgt_caches);
833         if (result != 0)
834                 RETURN(result);
835
836         result = lustre_tgt_register_fs();
837         if (result != 0) {
838                 lu_kmem_fini(tgt_caches);
839                 RETURN(result);
840         }
841
842         tgt_page_to_corrupt = alloc_page(GFP_KERNEL);
843
844         tgt_key_init_generic(&tgt_thread_key, NULL);
845         lu_context_key_register_many(&tgt_thread_key, NULL);
846
847         tgt_ses_key_init_generic(&tgt_session_key, NULL);
848         lu_context_key_register_many(&tgt_session_key, NULL);
849         barrier_init();
850
851         update_info_init();
852
853         RETURN(0);
854 }
855
856 void tgt_mod_exit(void)
857 {
858         barrier_fini();
859         if (tgt_page_to_corrupt != NULL)
860                 put_page(tgt_page_to_corrupt);
861
862         lu_context_key_degister(&tgt_thread_key);
863         lu_context_key_degister(&tgt_session_key);
864         update_info_fini();
865
866         lustre_tgt_unregister_fs();
867
868         lu_kmem_fini(tgt_caches);
869 }
870