Whamcloud - gitweb
6ec782639a3dcaea3b3e70b3c736a1211fe399e6
[fs/lustre-release.git] / lustre / lod / lod_dev.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/lod/lod_dev.c
33  *
34  * Lustre Logical Object Device
35  *
36  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
37  * Author: Mikhail Pershin <mike.pershin@intel.com>
38  */
39 /**
40  * The Logical Object Device (LOD) layer manages access to striped
41  * objects (both regular files and directories). It implements the DT
42  * device and object APIs and is responsible for creating, storing,
43  * and loading striping information as an extended attribute of the
44  * underlying OSD object. LOD is the server side analog of the LOV and
45  * LMV layers on the client side.
46  *
47  * Metadata LU object stack (layers of the same compound LU object,
48  * all have the same FID):
49  *
50  *        MDT
51  *         |      MD API
52  *        MDD
53  *         |      DT API
54  *        LOD
55  *       /   \    DT API
56  *     OSD   OSP
57  *
58  * During LOD object initialization the localness or remoteness of the
59  * object FID dictates the choice between OSD and OSP.
60  *
61  * An LOD object (file or directory) with N stripes (each has a
62  * different FID):
63  *
64  *          LOD
65  *           |
66  *   +---+---+---+...+
67  *   |   |   |   |   |
68  *   S0  S1  S2  S3  S(N-1)  OS[DP] objects, seen as DT objects by LOD
69  *
70  * When upper layers must access an object's stripes (which are
71  * themselves OST or MDT LU objects) LOD finds these objects by their
72  * FIDs and stores them as an array of DT object pointers on the
73  * object. Declarations and operations on LOD objects are received by
74  * LOD (as DT object operations) and performed on the underlying
75  * OS[DP] object and (as needed) on the stripes. From the perspective
76  * of LOD, a stripe-less file (created by mknod() or open with
77  * O_LOV_DELAY_CREATE) is an object which does not yet have stripes,
78  * while a non-striped directory (created by mkdir()) is an object
79  * which will never have stripes.
80  *
81  * The LOD layer also implements a small subset of the OBD device API
82  * to support MDT stack initialization and finalization (an MDD device
83  * connects and disconnects itself to and from the underlying LOD
84  * device), and pool management. In turn LOD uses the OBD device API
85  * to connect it self to the underlying OSD, and to connect itself to
86  * OSP devices representing the MDTs and OSTs that bear the stripes of
87  * its objects.
88  */
89
90 #define DEBUG_SUBSYSTEM S_MDS
91
92 #include <linux/kthread.h>
93 #include <obd_class.h>
94 #include <md_object.h>
95 #include <lustre_fid.h>
96 #include <uapi/linux/lustre/lustre_param.h>
97 #include <lustre_update.h>
98 #include <lustre_log.h>
99 #include <lustre_lmv.h>
100
101 #include "lod_internal.h"
102
103 static const char lod_update_log_name[] = "update_log";
104 static const char lod_update_log_dir_name[] = "update_log_dir";
105
106 /*
107  * Lookup target by FID.
108  *
109  * Lookup MDT/OST target index by FID. Type of the target can be
110  * specific or any.
111  *
112  * \param[in] env               LU environment provided by the caller
113  * \param[in] lod               lod device
114  * \param[in] fid               FID
115  * \param[out] tgt              result target index
116  * \param[in] type              expected type of the target:
117  *                              LU_SEQ_RANGE_{MDT,OST,ANY}
118  *
119  * \retval 0                    on success
120  * \retval negative             negated errno on error
121  **/
122 int lod_fld_lookup(const struct lu_env *env, struct lod_device *lod,
123                    const struct lu_fid *fid, u32 *tgt, int *type)
124 {
125         struct lu_seq_range range = { 0 };
126         struct lu_server_fld *server_fld;
127         int rc;
128
129         ENTRY;
130
131         if (!fid_is_sane(fid)) {
132                 CERROR("%s: invalid FID "DFID"\n", lod2obd(lod)->obd_name,
133                        PFID(fid));
134                 RETURN(-EIO);
135         }
136
137         if (fid_is_idif(fid)) {
138                 *tgt = fid_idif_ost_idx(fid);
139                 *type = LU_SEQ_RANGE_OST;
140                 RETURN(0);
141         }
142
143         if (fid_is_update_log(fid) || fid_is_update_log_dir(fid)) {
144                 *tgt = fid_oid(fid);
145                 *type = LU_SEQ_RANGE_MDT;
146                 RETURN(0);
147         }
148
149         if (!lod->lod_initialized || (!fid_seq_in_fldb(fid_seq(fid)))) {
150                 LASSERT(lu_site2seq(lod2lu_dev(lod)->ld_site) != NULL);
151
152                 *tgt = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
153                 *type = LU_SEQ_RANGE_MDT;
154                 RETURN(0);
155         }
156
157         server_fld = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_server_fld;
158         if (!server_fld)
159                 RETURN(-EIO);
160
161         fld_range_set_type(&range, *type);
162         rc = fld_server_lookup(env, server_fld, fid_seq(fid), &range);
163         if (rc != 0)
164                 RETURN(rc);
165
166         *tgt = range.lsr_index;
167         *type = range.lsr_flags;
168
169         CDEBUG(D_INFO, "%s: got tgt %x for sequence: %#llx\n",
170                lod2obd(lod)->obd_name, *tgt, fid_seq(fid));
171
172         RETURN(0);
173 }
174
175 /* Slab for OSD object allocation */
176 struct kmem_cache *lod_object_kmem;
177
178 /* Slab for dt_txn_callback */
179 struct kmem_cache *lod_txn_callback_kmem;
180 static struct lu_kmem_descr lod_caches[] = {
181         {
182                 .ckd_cache = &lod_object_kmem,
183                 .ckd_name  = "lod_obj",
184                 .ckd_size  = sizeof(struct lod_object)
185         },
186         {
187                 .ckd_cache = &lod_txn_callback_kmem,
188                 .ckd_name  = "lod_txn_callback",
189                 .ckd_size  = sizeof(struct dt_txn_callback)
190         },
191         {
192                 .ckd_cache = NULL
193         }
194 };
195
196 static struct lu_device *lod_device_fini(const struct lu_env *env,
197                                          struct lu_device *d);
198
199 /**
200  * Implementation of lu_device_operations::ldo_object_alloc() for LOD
201  *
202  * Allocates and initializes LOD's slice in the given object.
203  *
204  * see include/lu_object.h for the details.
205  */
206 static struct lu_object *lod_object_alloc(const struct lu_env *env,
207                                           const struct lu_object_header *hdr,
208                                           struct lu_device *dev)
209 {
210         struct lod_object *lod_obj;
211         struct lu_object *lu_obj;
212
213         ENTRY;
214
215         OBD_SLAB_ALLOC_PTR_GFP(lod_obj, lod_object_kmem, GFP_NOFS);
216         if (!lod_obj)
217                 RETURN(ERR_PTR(-ENOMEM));
218
219         mutex_init(&lod_obj->ldo_layout_mutex);
220         lu_obj = lod2lu_obj(lod_obj);
221         dt_object_init(&lod_obj->ldo_obj, NULL, dev);
222         lod_obj->ldo_obj.do_ops = &lod_obj_ops;
223         lu_obj->lo_ops = &lod_lu_obj_ops;
224
225         RETURN(lu_obj);
226 }
227
228 /**
229  * Process the config log for all sub device.
230  *
231  * The function goes through all the targets in the given table
232  * and apply given configuration command on to the targets.
233  * Used to cleanup the targets at unmount.
234  *
235  * \param[in] env               LU environment provided by the caller
236  * \param[in] lod               lod device
237  * \param[in] ltd               target's table to go through
238  * \param[in] lcfg              configuration command to apply
239  *
240  * \retval 0                    on success
241  * \retval negative             negated errno on error
242  **/
243 static int lod_sub_process_config(const struct lu_env *env,
244                                  struct lod_device *lod,
245                                  struct lod_tgt_descs *ltd,
246                                  struct lustre_cfg *lcfg)
247 {
248         struct lu_device *next;
249         struct lu_tgt_desc *tgt;
250         int rc = 0;
251
252         lod_getref(ltd);
253         ltd_foreach_tgt(ltd, tgt) {
254                 int rc1;
255
256                 LASSERT(tgt && tgt->ltd_tgt);
257                 next = &tgt->ltd_tgt->dd_lu_dev;
258                 rc1 = next->ld_ops->ldo_process_config(env, next, lcfg);
259                 if (rc1) {
260                         CERROR("%s: error cleaning up LOD index %u: cmd %#x : rc = %d\n",
261                                lod2obd(lod)->obd_name, tgt->ltd_index,
262                                lcfg->lcfg_command, rc1);
263                         rc = rc1;
264                 }
265         }
266         lod_putref(lod, ltd);
267         return rc;
268 }
269
270 struct lod_recovery_data {
271         struct lod_device       *lrd_lod;
272         struct lod_tgt_desc     *lrd_ltd;
273         struct task_struct      **lrd_task;
274         u32                     lrd_idx;
275         struct lu_env           lrd_env;
276         struct completion       *lrd_started;
277 };
278
279
280 /**
281  * process update recovery record
282  *
283  * Add the update recovery recode to the update recovery list in
284  * lod_recovery_data. Then the recovery thread (target_recovery_thread)
285  * will redo these updates.
286  *
287  * \param[in]env        execution environment
288  * \param[in]llh        log handle of update record
289  * \param[in]rec        update record to be replayed
290  * \param[in]data       update recovery data which holds the necessary
291  *                      arguments for recovery (see struct lod_recovery_data)
292  *
293  * \retval              0 if the record is processed successfully.
294  * \retval              negative errno if the record processing fails.
295  */
296 static int lod_process_recovery_updates(const struct lu_env *env,
297                                         struct llog_handle *llh,
298                                         struct llog_rec_hdr *rec,
299                                         void *data)
300 {
301         struct lod_recovery_data *lrd = data;
302         struct llog_cookie *cookie = &lod_env_info(env)->lti_cookie;
303         struct lu_target *lut;
304         u32 index = 0;
305
306         ENTRY;
307
308         if (!lrd->lrd_ltd) {
309                 int rc;
310
311                 rc = lodname2mdt_index(lod2obd(lrd->lrd_lod)->obd_name, &index);
312                 if (rc != 0)
313                         return rc;
314         } else {
315                 index = lrd->lrd_ltd->ltd_index;
316         }
317
318         if (rec->lrh_len !=
319                 llog_update_record_size((struct llog_update_record *)rec)) {
320                 CERROR("%s: broken update record! index %u "DFID".%u: rc = %d\n",
321                        lod2obd(lrd->lrd_lod)->obd_name, index,
322                        PFID(&llh->lgh_id.lgl_oi.oi_fid), rec->lrh_index, -EIO);
323                 return -EINVAL;
324         }
325
326         cookie->lgc_lgl = llh->lgh_id;
327         cookie->lgc_index = rec->lrh_index;
328         cookie->lgc_subsys = LLOG_UPDATELOG_ORIG_CTXT;
329
330         CDEBUG(D_HA, "%s: process recovery updates "DFID".%u\n",
331                lod2obd(lrd->lrd_lod)->obd_name,
332                PFID(&llh->lgh_id.lgl_oi.oi_fid), rec->lrh_index);
333         lut = lod2lu_dev(lrd->lrd_lod)->ld_site->ls_tgt;
334
335         if (lut->lut_obd->obd_stopping ||
336             lut->lut_obd->obd_abort_recovery)
337                 return -ESHUTDOWN;
338
339         return insert_update_records_to_replay_list(lut->lut_tdtd,
340                                         (struct llog_update_record *)rec,
341                                         cookie, index);
342 }
343
344 /**
345  * recovery thread for update log
346  *
347  * Start recovery thread and prepare the sub llog, then it will retrieve
348  * the update records from the correpondent MDT and do recovery.
349  *
350  * \param[in] arg       pointer to the recovery data
351  *
352  * \retval              0 if recovery succeeds
353  * \retval              negative errno if recovery failed.
354  */
355 static int lod_sub_recovery_thread(void *arg)
356 {
357         struct lod_recovery_data *lrd = arg;
358         struct lod_device *lod = lrd->lrd_lod;
359         struct dt_device *dt;
360         struct llog_ctxt *ctxt = NULL;
361         struct lu_env *env = &lrd->lrd_env;
362         struct lu_target *lut;
363         struct lu_tgt_desc *mdt = NULL;
364         time64_t start;
365         int retries = 0;
366         int rc;
367
368         ENTRY;
369
370         lut = lod2lu_dev(lod)->ld_site->ls_tgt;
371         atomic_inc(&lut->lut_tdtd->tdtd_recovery_threads_count);
372         if (!lrd->lrd_ltd)
373                 dt = lod->lod_child;
374         else
375                 dt = lrd->lrd_ltd->ltd_tgt;
376
377         start = ktime_get_real_seconds();
378         complete(lrd->lrd_started);
379
380 again:
381
382         if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_TGT_RECOVERY_CONNECT)) &&
383             lrd->lrd_ltd) {
384                 OBD_FAIL_TIMEOUT(OBD_FAIL_TGT_RECOVERY_CONNECT, cfs_fail_val);
385                 rc = -EIO;
386         } else {
387                 rc = lod_sub_prep_llog(env, lod, dt, lrd->lrd_idx);
388         }
389         if (!rc && !lod->lod_child->dd_rdonly) {
390                 /* Process the recovery record */
391                 ctxt = llog_get_context(dt->dd_lu_dev.ld_obd,
392                                         LLOG_UPDATELOG_ORIG_CTXT);
393                 LASSERT(ctxt != NULL);
394                 LASSERT(ctxt->loc_handle != NULL);
395
396                 rc = llog_cat_process(env, ctxt->loc_handle,
397                                       lod_process_recovery_updates, lrd, 0, 0);
398         }
399
400         if (rc < 0) {
401                 struct lu_device *top_device;
402
403                 top_device = lod->lod_dt_dev.dd_lu_dev.ld_site->ls_top_dev;
404                 /*
405                  * Because the remote target might failover at the same time,
406                  * let's retry here
407                  */
408                 if ((rc == -ETIMEDOUT || rc == -EAGAIN || rc == -EIO) &&
409                      dt != lod->lod_child &&
410                     !top_device->ld_obd->obd_abort_recovery &&
411                     !top_device->ld_obd->obd_stopping) {
412                         if (ctxt) {
413                                 if (ctxt->loc_handle)
414                                         llog_cat_close(env,
415                                                        ctxt->loc_handle);
416                                 llog_ctxt_put(ctxt);
417                         }
418                         retries++;
419                         CDEBUG(D_HA, "%s get update log failed %d, retry\n",
420                                dt->dd_lu_dev.ld_obd->obd_name, rc);
421                         goto again;
422                 }
423
424                 CERROR("%s get update log failed: rc = %d\n",
425                        dt->dd_lu_dev.ld_obd->obd_name, rc);
426                 llog_ctxt_put(ctxt);
427
428                 spin_lock(&top_device->ld_obd->obd_dev_lock);
429                 if (!top_device->ld_obd->obd_abort_recovery &&
430                     !top_device->ld_obd->obd_stopping)
431                         top_device->ld_obd->obd_abort_recovery = 1;
432                 spin_unlock(&top_device->ld_obd->obd_dev_lock);
433
434                 GOTO(out, rc);
435         }
436         llog_ctxt_put(ctxt);
437
438         CDEBUG(D_HA, "%s retrieved update log, duration %lld, retries %d\n",
439                dt->dd_lu_dev.ld_obd->obd_name, ktime_get_real_seconds() - start,
440                retries);
441
442         spin_lock(&lod->lod_lock);
443         if (!lrd->lrd_ltd)
444                 lod->lod_child_got_update_log = 1;
445         else
446                 lrd->lrd_ltd->ltd_got_update_log = 1;
447
448         if (!lod->lod_child_got_update_log) {
449                 spin_unlock(&lod->lod_lock);
450                 GOTO(out, rc = 0);
451         }
452
453         lod_foreach_mdt(lod, mdt) {
454                 if (!mdt->ltd_got_update_log) {
455                         spin_unlock(&lod->lod_lock);
456                         GOTO(out, rc = 0);
457                 }
458         }
459         lut->lut_tdtd->tdtd_replay_ready = 1;
460         spin_unlock(&lod->lod_lock);
461
462         CDEBUG(D_HA, "%s got update logs from all MDTs.\n",
463                lut->lut_obd->obd_name);
464         wake_up(&lut->lut_obd->obd_next_transno_waitq);
465         EXIT;
466
467 out:
468         atomic_dec(&lut->lut_tdtd->tdtd_recovery_threads_count);
469         wake_up(&lut->lut_tdtd->tdtd_recovery_threads_waitq);
470         if (xchg(lrd->lrd_task, NULL) == NULL)
471                 /* Someone is waiting for us to finish, need
472                  * to synchronize cleanly.
473                  */
474                 wait_var_event(lrd, kthread_should_stop());
475         lu_env_fini(env);
476         OBD_FREE_PTR(lrd);
477         return 0;
478 }
479
480 /**
481  * finish sub llog context
482  *
483  * Stop update recovery thread for the sub device, then cleanup the
484  * correspondent llog ctxt.
485  *
486  * \param[in] env      execution environment
487  * \param[in] lod      lod device to do update recovery
488  * \param[in] thread   recovery thread on this sub device
489  */
490 void lod_sub_fini_llog(const struct lu_env *env,
491                        struct dt_device *dt, struct task_struct **thread)
492 {
493         struct obd_device *obd;
494         struct llog_ctxt *ctxt;
495         struct task_struct *task = NULL;
496
497         ENTRY;
498
499         obd = dt->dd_lu_dev.ld_obd;
500         CDEBUG(D_INFO, "%s: finish sub llog\n", obd->obd_name);
501         /* Wait for recovery thread to complete */
502         if (thread)
503                 task = xchg(thread, NULL);
504         if (task)
505                 kthread_stop(task);
506
507         ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
508         if (!ctxt)
509                 RETURN_EXIT;
510
511         if (ctxt->loc_handle)
512                 llog_cat_close(env, ctxt->loc_handle);
513
514         llog_cleanup(env, ctxt);
515
516         RETURN_EXIT;
517 }
518
519 /**
520  * Extract MDT target index from a device name.
521  *
522  * a helper function to extract index from the given device name
523  * like "fsname-MDTxxxx-mdtlov"
524  *
525  * \param[in] lodname           device name
526  * \param[out] mdt_index        extracted index
527  *
528  * \retval 0            on success
529  * \retval -EINVAL      if the name is invalid
530  */
531 int lodname2mdt_index(char *lodname, u32 *mdt_index)
532 {
533         u32 index;
534         const char *ptr, *tmp;
535         int rc;
536
537         /* 1.8 configs don't have "-MDT0000" at the end */
538         ptr = strstr(lodname, "-MDT");
539         if (!ptr) {
540                 *mdt_index = 0;
541                 return 0;
542         }
543
544         ptr = strrchr(lodname, '-');
545         if (!ptr) {
546                 rc = -EINVAL;
547                 CERROR("invalid MDT index in '%s': rc = %d\n", lodname, rc);
548                 return rc;
549         }
550
551         if (strncmp(ptr, "-mdtlov", 7) != 0) {
552                 rc = -EINVAL;
553                 CERROR("invalid MDT index in '%s': rc = %d\n", lodname, rc);
554                 return rc;
555         }
556
557         if ((unsigned long)ptr - (unsigned long)lodname <= 8) {
558                 rc = -EINVAL;
559                 CERROR("invalid MDT index in '%s': rc = %d\n", lodname, rc);
560                 return rc;
561         }
562
563         if (strncmp(ptr - 8, "-MDT", 4) != 0) {
564                 rc = -EINVAL;
565                 CERROR("invalid MDT index in '%s': rc = %d\n", lodname, rc);
566                 return rc;
567         }
568
569         rc = target_name2index(ptr - 7, &index, &tmp);
570         if (rc < 0 || rc & LDD_F_SV_ALL || *tmp != '-') {
571                 rc = -EINVAL;
572                 CERROR("invalid MDT index in '%s': rc = %d\n", lodname, rc);
573                 return rc;
574         }
575         *mdt_index = index;
576         return 0;
577 }
578
579 /**
580  * Init sub llog context
581  *
582  * Setup update llog ctxt for update recovery threads, then start the
583  * recovery thread (lod_sub_recovery_thread) to read update llog from
584  * the correspondent MDT to do update recovery.
585  *
586  * \param[in] env       execution environment
587  * \param[in] lod       lod device to do update recovery
588  * \param[in] dt        sub dt device for which the recovery thread is
589  *
590  * \retval              0 if initialization succeeds.
591  * \retval              negative errno if initialization fails.
592  */
593 int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod,
594                       struct dt_device *dt)
595 {
596         struct obd_device *obd;
597         struct lod_recovery_data *lrd = NULL;
598         DECLARE_COMPLETION_ONSTACK(started);
599         struct task_struct **taskp;
600         struct task_struct *task;
601         struct lod_tgt_desc *subtgt = NULL;
602         u32 index;
603         u32 master_index;
604         int rc;
605
606         ENTRY;
607
608         rc = lodname2mdt_index(lod2obd(lod)->obd_name, &master_index);
609         if (rc != 0)
610                 RETURN(rc);
611
612         OBD_ALLOC_PTR(lrd);
613         if (!lrd)
614                 RETURN(-ENOMEM);
615
616         if (lod->lod_child == dt) {
617                 taskp = &lod->lod_child_recovery_task;
618                 index = master_index;
619         } else {
620                 struct lu_tgt_desc *mdt;
621
622                 lod_foreach_mdt(lod, mdt) {
623                         if (mdt->ltd_tgt == dt) {
624                                 index = mdt->ltd_index;
625                                 subtgt = mdt;
626                                 break;
627                         }
628                 }
629                 LASSERT(subtgt != NULL);
630                 taskp = &subtgt->ltd_recovery_task;
631         }
632
633         CDEBUG(D_INFO, "%s init sub log %s\n", lod2obd(lod)->obd_name,
634                dt->dd_lu_dev.ld_obd->obd_name);
635         lrd->lrd_lod = lod;
636         lrd->lrd_ltd = subtgt;
637         lrd->lrd_task = taskp;
638         lrd->lrd_idx = index;
639         lrd->lrd_started = &started;
640
641         obd = dt->dd_lu_dev.ld_obd;
642         obd->obd_lvfs_ctxt.dt = dt;
643         rc = llog_setup(env, obd, &obd->obd_olg, LLOG_UPDATELOG_ORIG_CTXT,
644                         NULL, &llog_common_cat_ops);
645         if (rc < 0) {
646                 CERROR("%s: cannot setup updatelog llog: rc = %d\n",
647                        obd->obd_name, rc);
648                 GOTO(free_lrd, rc);
649         }
650
651         rc = lu_env_init(&lrd->lrd_env, LCT_LOCAL | LCT_MD_THREAD);
652         if (rc != 0) {
653                 CERROR("%s: can't initialize env: rc = %d\n",
654                        lod2obd(lod)->obd_name, rc);
655                 GOTO(free_lrd, rc);
656         }
657
658         /* Start the recovery thread */
659         task = kthread_create(lod_sub_recovery_thread, lrd, "lod%04x_rec%04x",
660                               master_index, index);
661         if (IS_ERR(task)) {
662                 rc = PTR_ERR(task);
663                 CERROR("%s: cannot start recovery thread: rc = %d\n",
664                        obd->obd_name, rc);
665                 lu_env_fini(&lrd->lrd_env);
666                 GOTO(out_llog, rc);
667         }
668         *taskp = task;
669         wake_up_process(task);
670         wait_for_completion(&started);
671
672         RETURN(0);
673 out_llog:
674         lod_sub_fini_llog(env, dt, taskp);
675 free_lrd:
676         OBD_FREE_PTR(lrd);
677         RETURN(rc);
678 }
679
680 /**
681  * Stop sub recovery thread
682  *
683  * Stop sub recovery thread on all subs.
684  *
685  * \param[in] env       execution environment
686  * \param[in] lod       lod device to do update recovery
687  */
688 static void lod_sub_stop_recovery_threads(const struct lu_env *env,
689                                           struct lod_device *lod)
690 {
691         struct task_struct *task;
692         struct lu_tgt_desc *mdt;
693
694         /*
695          * Stop the update log commit cancel threads and finish master
696          * llog ctxt
697          */
698         task = xchg(&lod->lod_child_recovery_task, NULL);
699         if (task)
700                 kthread_stop(task);
701
702         lod_getref(&lod->lod_mdt_descs);
703         lod_foreach_mdt(lod, mdt) {
704                 task = xchg(&mdt->ltd_recovery_task, NULL);
705                 if (task)
706                         kthread_stop(task);
707         }
708         lod_putref(lod, &lod->lod_mdt_descs);
709 }
710
711 /**
712  * finish all sub llog
713  *
714  * cleanup all of sub llog ctxt on the LOD.
715  *
716  * \param[in] env       execution environment
717  * \param[in] lod       lod device to do update recovery
718  */
719 static void lod_sub_fini_all_llogs(const struct lu_env *env,
720                                    struct lod_device *lod)
721 {
722         struct lu_tgt_desc *mdt;
723
724         /*
725          * Stop the update log commit cancel threads and finish master
726          * llog ctxt
727          */
728         lod_sub_fini_llog(env, lod->lod_child,
729                           &lod->lod_child_recovery_task);
730         lod_getref(&lod->lod_mdt_descs);
731         lod_foreach_mdt(lod, mdt)
732                 lod_sub_fini_llog(env, mdt->ltd_tgt,
733                                   &mdt->ltd_recovery_task);
734         lod_putref(lod, &lod->lod_mdt_descs);
735 }
736
737 static char *lod_show_update_logs_retrievers(void *data, int *size, int *count)
738 {
739         struct lod_device *lod = (struct lod_device *)data;
740         struct lu_target *lut = lod2lu_dev(lod)->ld_site->ls_tgt;
741         struct lu_tgt_desc *mdt = NULL;
742         char *buf;
743         int len = 0;
744         int rc;
745         int i;
746
747         *count = atomic_read(&lut->lut_tdtd->tdtd_recovery_threads_count);
748         if (*count == 0) {
749                 *size = 0;
750                 return NULL;
751         }
752
753         *size = 5 * *count + 1;
754         OBD_ALLOC(buf, *size);
755         if (!buf)
756                 return NULL;
757
758         *count = 0;
759         memset(buf, 0, *size);
760
761         if (!lod->lod_child_got_update_log) {
762                 rc = lodname2mdt_index(lod2obd(lod)->obd_name, &i);
763                 LASSERTF(rc == 0, "Fail to parse target index: rc = %d\n", rc);
764
765                 rc = scnprintf(buf + len, *size - len, " %04x", i);
766                 LASSERT(rc > 0);
767
768                 len += rc;
769                 (*count)++;
770         }
771
772         lod_foreach_mdt(lod, mdt) {
773                 if (!mdt->ltd_got_update_log) {
774                         rc = scnprintf(buf + len, *size - len, " %04x",
775                                        mdt->ltd_index);
776                         if (unlikely(rc <= 0))
777                                 break;
778
779                         len += rc;
780                         (*count)++;
781                 }
782         }
783
784         return buf;
785 }
786
787 /**
788  * Prepare distribute txn
789  *
790  * Prepare distribute txn structure for LOD
791  *
792  * \param[in] env       execution environment
793  * \param[in] lod_device  LOD device
794  *
795  * \retval              0 if preparation succeeds.
796  * \retval              negative errno if preparation fails.
797  */
798 static int lod_prepare_distribute_txn(const struct lu_env *env,
799                                       struct lod_device *lod)
800 {
801         struct target_distribute_txn_data *tdtd;
802         struct lu_target *lut;
803         int rc;
804
805         ENTRY;
806
807         /* Init update recovery data */
808         OBD_ALLOC_PTR(tdtd);
809         if (!tdtd)
810                 RETURN(-ENOMEM);
811
812         lut = lod2lu_dev(lod)->ld_site->ls_tgt;
813         tdtd->tdtd_dt = &lod->lod_dt_dev;
814         rc = distribute_txn_init(env, lut, tdtd,
815                 lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id);
816
817         if (rc < 0) {
818                 CERROR("%s: cannot init distribute txn: rc = %d\n",
819                        lod2obd(lod)->obd_name, rc);
820                 OBD_FREE_PTR(tdtd);
821                 RETURN(rc);
822         }
823
824         tdtd->tdtd_show_update_logs_retrievers =
825                 lod_show_update_logs_retrievers;
826         tdtd->tdtd_show_retrievers_cbdata = lod;
827
828         lut->lut_tdtd = tdtd;
829
830         RETURN(0);
831 }
832
833 /**
834  * Finish distribute txn
835  *
836  * Release the resource holding by distribute txn, i.e. stop distribute
837  * txn thread.
838  *
839  * \param[in] env       execution environment
840  * \param[in] lod       lod device
841  */
842 static void lod_fini_distribute_txn(const struct lu_env *env,
843                                     struct lod_device *lod)
844 {
845         struct lu_target *lut;
846
847         lut = lod2lu_dev(lod)->ld_site->ls_tgt;
848         target_recovery_fini(lut->lut_obd);
849         if (!lut->lut_tdtd)
850                 return;
851
852         distribute_txn_fini(env, lut->lut_tdtd);
853
854         OBD_FREE_PTR(lut->lut_tdtd);
855         lut->lut_tdtd = NULL;
856 }
857
858 /**
859  * Implementation of lu_device_operations::ldo_process_config() for LOD
860  *
861  * The method is called by the configuration subsystem during setup,
862  * cleanup and when the configuration changes. The method processes
863  * few specific commands like adding/removing the targets, changing
864  * the runtime parameters.
865
866  * \param[in] env               LU environment provided by the caller
867  * \param[in] dev               lod device
868  * \param[in] lcfg              configuration command to apply
869  *
870  * \retval 0                    on success
871  * \retval negative             negated errno on error
872  *
873  * The examples are below.
874  *
875  * Add osc config log:
876  * marker  20 (flags=0x01, v2.2.49.56) lustre-OST0001  'add osc'
877  * add_uuid  nid=192.168.122.162@tcp(0x20000c0a87aa2)  0:  1:nidxxx
878  * attach    0:lustre-OST0001-osc-MDT0001  1:osc  2:lustre-MDT0001-mdtlov_UUID
879  * setup     0:lustre-OST0001-osc-MDT0001  1:lustre-OST0001_UUID  2:nid
880  * lov_modify_tgts add 0:lustre-MDT0001-mdtlov  1:lustre-OST0001_UUID  2:1  3:1
881  * marker  20 (flags=0x02, v2.2.49.56) lustre-OST0001  'add osc'
882  *
883  * Add mdc config log:
884  * marker  10 (flags=0x01, v2.2.49.56) lustre-MDT0000  'add osp'
885  * add_uuid  nid=192.168.122.162@tcp(0x20000c0a87aa2)  0:  1:nid
886  * attach 0:lustre-MDT0000-osp-MDT0001  1:osp  2:lustre-MDT0001-mdtlov_UUID
887  * setup     0:lustre-MDT0000-osp-MDT0001  1:lustre-MDT0000_UUID  2:nid
888  * modify_mdc_tgts add 0:lustre-MDT0001  1:lustre-MDT0000_UUID  2:0  3:1
889  * marker  10 (flags=0x02, v2.2.49.56) lustre-MDT0000_UUID  'add osp'
890  */
891 static int lod_process_config(const struct lu_env *env,
892                               struct lu_device *dev,
893                               struct lustre_cfg *lcfg)
894 {
895         struct lod_device *lod = lu2lod_dev(dev);
896         struct lu_device *next = &lod->lod_child->dd_lu_dev;
897         char *arg1;
898         int rc = 0;
899
900         ENTRY;
901
902         switch (lcfg->lcfg_command) {
903         case LCFG_LOV_DEL_OBD:
904         case LCFG_LOV_ADD_INA:
905         case LCFG_LOV_ADD_OBD:
906         case LCFG_ADD_MDC: {
907                 u32 index;
908                 u32 mdt_index;
909                 int gen;
910                 /*
911                  * lov_modify_tgts add  0:lov_mdsA  1:osp  2:0  3:1
912                  * modify_mdc_tgts add  0:lustre-MDT0001
913                  *                    1:lustre-MDT0001-mdc0002
914                  *                    2:2  3:1
915                  */
916                 arg1 = lustre_cfg_string(lcfg, 1);
917
918                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
919                         GOTO(out, rc = -EINVAL);
920                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
921                         GOTO(out, rc = -EINVAL);
922
923                 if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
924                         u32 mdt_index;
925
926                         rc = lodname2mdt_index(lustre_cfg_string(lcfg, 0),
927                                                &mdt_index);
928                         if (rc != 0)
929                                 GOTO(out, rc);
930
931                         rc = lod_add_device(env, lod, arg1, index, gen,
932                                             mdt_index, LUSTRE_OSC_NAME, 1);
933                 } else if (lcfg->lcfg_command == LCFG_ADD_MDC) {
934                         mdt_index = index;
935                         rc = lod_add_device(env, lod, arg1, index, gen,
936                                             mdt_index, LUSTRE_MDC_NAME, 1);
937                 } else if (lcfg->lcfg_command == LCFG_LOV_ADD_INA) {
938                         /*FIXME: Add mdt_index for LCFG_LOV_ADD_INA*/
939                         mdt_index = 0;
940                         rc = lod_add_device(env, lod, arg1, index, gen,
941                                             mdt_index, LUSTRE_OSC_NAME, 0);
942                 } else {
943                         rc = lod_del_device(env, lod, &lod->lod_ost_descs,
944                                             arg1, index, gen);
945                 }
946
947                 break;
948         }
949
950         case LCFG_PARAM: {
951                 struct obd_device *obd;
952                 ssize_t count;
953                 char *param;
954
955                 /*
956                  * Check if it is activate/deactivate mdc
957                  * lustre-MDTXXXX-osp-MDTXXXX.active=1
958                  */
959                 param = lustre_cfg_buf(lcfg, 1);
960                 if (strstr(param, "osp") && strstr(param, ".active=")) {
961                         struct lod_tgt_desc *sub_tgt = NULL;
962                         struct lu_tgt_desc *mdt;
963                         char *ptr;
964                         char *tmp;
965
966                         ptr = strstr(param, ".");
967                         *ptr = '\0';
968                         obd = class_name2obd(param);
969                         if (!obd) {
970                                 CERROR("%s: can not find %s: rc = %d\n",
971                                        lod2obd(lod)->obd_name, param, -EINVAL);
972                                 *ptr = '.';
973                                 GOTO(out, rc);
974                         }
975
976                         lod_foreach_mdt(lod, mdt) {
977                                 if (mdt->ltd_tgt->dd_lu_dev.ld_obd == obd) {
978                                         sub_tgt = mdt;
979                                         break;
980                                 }
981                         }
982
983                         if (!sub_tgt) {
984                                 CERROR("%s: can not find %s: rc = %d\n",
985                                        lod2obd(lod)->obd_name, param, -EINVAL);
986                                 *ptr = '.';
987                                 GOTO(out, rc);
988                         }
989
990                         *ptr = '.';
991                         tmp = strstr(param, "=");
992                         tmp++;
993                         if (*tmp == '1') {
994                                 struct llog_ctxt *ctxt;
995
996                                 obd = sub_tgt->ltd_tgt->dd_lu_dev.ld_obd;
997                                 ctxt = llog_get_context(obd,
998                                                 LLOG_UPDATELOG_ORIG_CTXT);
999                                 if (!ctxt) {
1000                                         rc = llog_setup(env, obd, &obd->obd_olg,
1001                                                        LLOG_UPDATELOG_ORIG_CTXT,
1002                                                     NULL, &llog_common_cat_ops);
1003                                         if (rc < 0)
1004                                                 GOTO(out, rc);
1005                                 } else {
1006                                         llog_ctxt_put(ctxt);
1007                                 }
1008                                 rc = lod_sub_prep_llog(env, lod,
1009                                                        sub_tgt->ltd_tgt,
1010                                                        sub_tgt->ltd_index);
1011                                 if (rc == 0)
1012                                         sub_tgt->ltd_active = 1;
1013                         } else {
1014                                 lod_sub_fini_llog(env, sub_tgt->ltd_tgt,
1015                                                   NULL);
1016                                 sub_tgt->ltd_active = 0;
1017                         }
1018                         GOTO(out, rc);
1019                 }
1020
1021
1022                 if (strstr(param, PARAM_LOD) != NULL)
1023                         count = class_modify_config(lcfg, PARAM_LOD,
1024                                                     &lod->lod_dt_dev.dd_kobj);
1025                 else
1026                         count = class_modify_config(lcfg, PARAM_LOV,
1027                                                     &lod->lod_dt_dev.dd_kobj);
1028                 rc = count > 0 ? 0 : count;
1029                 GOTO(out, rc);
1030         }
1031         case LCFG_PRE_CLEANUP: {
1032                 lod_sub_process_config(env, lod, &lod->lod_mdt_descs, lcfg);
1033                 lod_sub_process_config(env, lod, &lod->lod_ost_descs, lcfg);
1034                 OBD_FAIL_TIMEOUT(OBD_FAIL_TGT_RECOVERY_CONNECT, cfs_fail_val * 2);
1035                 next = &lod->lod_child->dd_lu_dev;
1036                 rc = next->ld_ops->ldo_process_config(env, next, lcfg);
1037                 if (rc != 0)
1038                         CDEBUG(D_HA, "%s: can't process %u: %d\n",
1039                                lod2obd(lod)->obd_name, lcfg->lcfg_command, rc);
1040
1041                 lod_sub_stop_recovery_threads(env, lod);
1042                 lod_fini_distribute_txn(env, lod);
1043                 lod_sub_fini_all_llogs(env, lod);
1044                 break;
1045         }
1046         case LCFG_CLEANUP: {
1047                 if (lod->lod_md_root) {
1048                         dt_object_put(env, &lod->lod_md_root->ldo_obj);
1049                         lod->lod_md_root = NULL;
1050                 }
1051
1052                 /*
1053                  * do cleanup on underlying storage only when
1054                  * all OSPs are cleaned up, as they use that OSD as well
1055                  */
1056                 lu_dev_del_linkage(dev->ld_site, dev);
1057                 lod_sub_process_config(env, lod, &lod->lod_mdt_descs, lcfg);
1058                 lod_sub_process_config(env, lod, &lod->lod_ost_descs, lcfg);
1059                 next = &lod->lod_child->dd_lu_dev;
1060                 rc = next->ld_ops->ldo_process_config(env, next, lcfg);
1061                 if (rc)
1062                         CERROR("%s: can't process %u: rc = %d\n",
1063                                lod2obd(lod)->obd_name, lcfg->lcfg_command, rc);
1064
1065                 rc = obd_disconnect(lod->lod_child_exp);
1066                 if (rc)
1067                         CERROR("error in disconnect from storage: rc = %d\n",
1068                                rc);
1069                 break;
1070         }
1071         default:
1072                 CERROR("%s: unknown command %u\n", lod2obd(lod)->obd_name,
1073                        lcfg->lcfg_command);
1074                 rc = -EINVAL;
1075                 break;
1076         }
1077
1078 out:
1079         RETURN(rc);
1080 }
1081
1082 /**
1083  * Implementation of lu_device_operations::ldo_recovery_complete() for LOD
1084  *
1085  * The method is called once the recovery is complete. This implementation
1086  * distributes the notification to all the known targets.
1087  *
1088  * see include/lu_object.h for the details
1089  */
1090 static int lod_recovery_complete(const struct lu_env *env,
1091                                  struct lu_device *dev)
1092 {
1093         struct lod_device *lod = lu2lod_dev(dev);
1094         struct lu_device *next = &lod->lod_child->dd_lu_dev;
1095         struct lod_tgt_desc *tgt;
1096         int rc;
1097
1098         ENTRY;
1099
1100         LASSERT(lod->lod_recovery_completed == 0);
1101         lod->lod_recovery_completed = 1;
1102
1103         rc = next->ld_ops->ldo_recovery_complete(env, next);
1104
1105         lod_getref(&lod->lod_ost_descs);
1106         if (lod->lod_ost_descs.ltd_tgts_size > 0) {
1107                 lod_foreach_ost(lod, tgt) {
1108                         LASSERT(tgt && tgt->ltd_tgt);
1109                         next = &tgt->ltd_tgt->dd_lu_dev;
1110                         rc = next->ld_ops->ldo_recovery_complete(env, next);
1111                         if (rc)
1112                                 CERROR("%s: can't complete recovery on #%d: rc = %d\n",
1113                                        lod2obd(lod)->obd_name, tgt->ltd_index,
1114                                        rc);
1115                 }
1116         }
1117         lod_putref(lod, &lod->lod_ost_descs);
1118         RETURN(rc);
1119 }
1120
1121 /**
1122  * Init update logs on all sub device
1123  *
1124  * LOD initialize update logs on all of sub devices. Because the initialization
1125  * process might need FLD lookup, see llog_osd_open()->dt_locate()->...->
1126  * lod_object_init(), this API has to be called after LOD is initialized.
1127  * \param[in] env       execution environment
1128  * \param[in] lod       lod device
1129  *
1130  * \retval              0 if update log is initialized successfully.
1131  * \retval              negative errno if initialization fails.
1132  */
1133 static int lod_sub_init_llogs(const struct lu_env *env, struct lod_device *lod)
1134 {
1135         struct lu_tgt_desc *mdt;
1136         int rc;
1137
1138         ENTRY;
1139
1140         /*
1141          * llog must be setup after LOD is initialized, because llog
1142          * initialization include FLD lookup
1143          */
1144         LASSERT(lod->lod_initialized);
1145
1146         /* Init the llog in its own stack */
1147         rc = lod_sub_init_llog(env, lod, lod->lod_child);
1148         if (rc < 0)
1149                 RETURN(rc);
1150
1151         lod_foreach_mdt(lod, mdt) {
1152                 rc = lod_sub_init_llog(env, lod, mdt->ltd_tgt);
1153                 if (rc != 0)
1154                         break;
1155         }
1156
1157         RETURN(rc);
1158 }
1159
1160 /**
1161  * Implementation of lu_device_operations::ldo_prepare() for LOD
1162  *
1163  * see include/lu_object.h for the details.
1164  */
1165 static int lod_prepare(const struct lu_env *env, struct lu_device *pdev,
1166                        struct lu_device *cdev)
1167 {
1168         struct lod_device *lod = lu2lod_dev(cdev);
1169         struct lu_device *next = &lod->lod_child->dd_lu_dev;
1170         struct lu_fid *fid = &lod_env_info(env)->lti_fid;
1171         int rc;
1172         struct dt_object *root;
1173         struct dt_object *dto;
1174         u32 index;
1175
1176         ENTRY;
1177
1178         rc = next->ld_ops->ldo_prepare(env, pdev, next);
1179         if (rc != 0) {
1180                 CERROR("%s: prepare bottom error: rc = %d\n",
1181                        lod2obd(lod)->obd_name, rc);
1182                 RETURN(rc);
1183         }
1184
1185         lod->lod_initialized = 1;
1186
1187         rc = dt_root_get(env, lod->lod_child, fid);
1188         if (rc < 0)
1189                 RETURN(rc);
1190
1191         root = dt_locate(env, lod->lod_child, fid);
1192         if (IS_ERR(root))
1193                 RETURN(PTR_ERR(root));
1194
1195         /* Create update log object */
1196         index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
1197         lu_update_log_fid(fid, index);
1198
1199         dto = local_file_find_or_create_with_fid(env, lod->lod_child,
1200                                                  fid, root,
1201                                                  lod_update_log_name,
1202                                                  S_IFREG | 0644);
1203         if (IS_ERR(dto))
1204                 GOTO(out_put, rc = PTR_ERR(dto));
1205
1206         dt_object_put(env, dto);
1207
1208         /* Create update log dir */
1209         lu_update_log_dir_fid(fid, index);
1210         dto = local_file_find_or_create_with_fid(env, lod->lod_child,
1211                                                  fid, root,
1212                                                  lod_update_log_dir_name,
1213                                                  S_IFDIR | 0644);
1214         if (IS_ERR(dto))
1215                 GOTO(out_put, rc = PTR_ERR(dto));
1216
1217         dt_object_put(env, dto);
1218
1219         rc = lod_prepare_distribute_txn(env, lod);
1220         if (rc != 0)
1221                 GOTO(out_put, rc);
1222
1223         rc = lod_sub_init_llogs(env, lod);
1224         if (rc != 0)
1225                 GOTO(out_put, rc);
1226
1227 out_put:
1228         dt_object_put(env, root);
1229
1230         RETURN(rc);
1231 }
1232
1233 /**
1234  * Implementation of lu_device_operations::ldo_fid_alloc() for LOD
1235  *
1236  * Find corresponding device by passed parent and name, and allocate FID from
1237  * there.
1238  *
1239  * see include/lu_object.h for the details.
1240  */
1241 static int lod_fid_alloc(const struct lu_env *env, struct lu_device *d,
1242                          struct lu_fid *fid, struct lu_object *parent,
1243                          const struct lu_name *name)
1244 {
1245         struct lod_device *lod = lu2lod_dev(d);
1246         struct lod_object *lo = lu2lod_obj(parent);
1247         struct dt_device *next;
1248         int rc;
1249
1250         ENTRY;
1251
1252         /* if @parent is remote, we don't know whether its layout was changed,
1253          * always reload layout.
1254          */
1255         if (lu_object_remote(parent))
1256                 lod_striping_free(env, lo);
1257
1258         rc = lod_striping_load(env, lo);
1259         if (rc)
1260                 RETURN(rc);
1261
1262         if (lo->ldo_dir_stripe_count > 0 && name) {
1263                 struct dt_object *stripe;
1264                 int idx;
1265
1266                 idx = __lmv_name_to_stripe_index(lo->ldo_dir_hash_type,
1267                                                  lo->ldo_dir_stripe_count,
1268                                                  lo->ldo_dir_migrate_hash,
1269                                                  lo->ldo_dir_migrate_offset,
1270                                                  name->ln_name,
1271                                                  name->ln_namelen, true);
1272                 if (idx < 0)
1273                         RETURN(idx);
1274
1275                 stripe = lo->ldo_stripe[idx];
1276                 if (!stripe || !dt_object_exists(stripe))
1277                         RETURN(-ENODEV);
1278
1279                 next = lu2dt_dev(stripe->do_lu.lo_dev);
1280         } else {
1281                 next = lod->lod_child;
1282         }
1283
1284         rc = dt_fid_alloc(env, next, fid, parent, name);
1285
1286         RETURN(rc);
1287 }
1288
1289 const struct lu_device_operations lod_lu_ops = {
1290         .ldo_object_alloc       = lod_object_alloc,
1291         .ldo_process_config     = lod_process_config,
1292         .ldo_recovery_complete  = lod_recovery_complete,
1293         .ldo_prepare            = lod_prepare,
1294         .ldo_fid_alloc          = lod_fid_alloc,
1295 };
1296
1297 /**
1298  * Implementation of dt_device_operations::dt_root_get() for LOD
1299  *
1300  * see include/dt_object.h for the details.
1301  */
1302 static int lod_root_get(const struct lu_env *env,
1303                         struct dt_device *dev, struct lu_fid *f)
1304 {
1305         return dt_root_get(env, dt2lod_dev(dev)->lod_child, f);
1306 }
1307
1308 static void lod_statfs_sum(struct obd_statfs *sfs,
1309                              struct obd_statfs *ost_sfs, int *bs)
1310 {
1311         while (ost_sfs->os_bsize < *bs) {
1312                 *bs >>= 1;
1313                 sfs->os_bsize >>= 1;
1314                 sfs->os_bavail <<= 1;
1315                 sfs->os_blocks <<= 1;
1316                 sfs->os_bfree <<= 1;
1317                 sfs->os_granted <<= 1;
1318         }
1319         while (ost_sfs->os_bsize > *bs) {
1320                 ost_sfs->os_bsize >>= 1;
1321                 ost_sfs->os_bavail <<= 1;
1322                 ost_sfs->os_blocks <<= 1;
1323                 ost_sfs->os_bfree <<= 1;
1324                 ost_sfs->os_granted <<= 1;
1325         }
1326         sfs->os_bavail += ost_sfs->os_bavail;
1327         sfs->os_blocks += ost_sfs->os_blocks;
1328         sfs->os_bfree += ost_sfs->os_bfree;
1329         sfs->os_granted += ost_sfs->os_granted;
1330 }
1331
1332 /**
1333  * Implementation of dt_device_operations::dt_statfs() for LOD
1334  *
1335  * see include/dt_object.h for the details.
1336  */
1337 static int lod_statfs(const struct lu_env *env, struct dt_device *dev,
1338                       struct obd_statfs *sfs, struct obd_statfs_info *info)
1339 {
1340         struct lod_device *lod = dt2lod_dev(dev);
1341         struct lu_tgt_desc *tgt;
1342         struct obd_statfs ost_sfs;
1343         u64 ost_files = 0;
1344         u64 ost_ffree = 0;
1345         int rc, bs;
1346
1347         rc = dt_statfs(env, dt2lod_dev(dev)->lod_child, sfs);
1348         if (rc)
1349                 GOTO(out, rc);
1350
1351         bs = sfs->os_bsize;
1352
1353         sfs->os_bavail = 0;
1354         sfs->os_blocks = 0;
1355         sfs->os_bfree = 0;
1356         sfs->os_granted = 0;
1357
1358         lod_getref(&lod->lod_mdt_descs);
1359         lod_foreach_mdt(lod, tgt) {
1360                 rc = dt_statfs(env, tgt->ltd_tgt, &ost_sfs);
1361                 /* ignore errors */
1362                 if (rc)
1363                         continue;
1364                 sfs->os_files += ost_sfs.os_files;
1365                 sfs->os_ffree += ost_sfs.os_ffree;
1366                 lod_statfs_sum(sfs, &ost_sfs, &bs);
1367         }
1368         lod_putref(lod, &lod->lod_mdt_descs);
1369
1370         /*
1371          * at some point we can check whether DoM is enabled and
1372          * decide how to account MDT space. for simplicity let's
1373          * just fallback to pre-DoM policy if any OST is alive
1374          */
1375         lod_getref(&lod->lod_ost_descs);
1376         lod_foreach_ost(lod, tgt) {
1377                 rc = dt_statfs(env, tgt->ltd_tgt, &ost_sfs);
1378                 /* ignore errors */
1379                 if (rc || ost_sfs.os_bsize == 0)
1380                         continue;
1381                 if (!ost_files) {
1382                         /*
1383                          * if only MDTs with DoM then report only MDT blocks,
1384                          * otherwise show only OST blocks, and DoM is "free"
1385                          */
1386                         sfs->os_bavail = 0;
1387                         sfs->os_blocks = 0;
1388                         sfs->os_bfree = 0;
1389                         sfs->os_granted = 0;
1390                 }
1391                 ost_files += ost_sfs.os_files;
1392                 ost_ffree += ost_sfs.os_ffree;
1393                 ost_sfs.os_bavail += ost_sfs.os_granted;
1394                 lod_statfs_sum(sfs, &ost_sfs, &bs);
1395                 LASSERTF(bs == ost_sfs.os_bsize, "%d != %d\n",
1396                         (int)sfs->os_bsize, (int)ost_sfs.os_bsize);
1397         }
1398         lod_putref(lod, &lod->lod_ost_descs);
1399         sfs->os_state |= OS_STATFS_SUM;
1400
1401         /* If we have _some_ OSTs, but don't have as many free objects on the
1402          * OSTs as inodes on the MDTs, reduce the reported number of inodes
1403          * to compensate, so that the "inodes in use" number is correct.
1404          * This should be kept in sync with ll_statfs_internal().
1405          */
1406         if (ost_files && ost_ffree < sfs->os_ffree) {
1407                 sfs->os_files = (sfs->os_files - sfs->os_ffree) + ost_ffree;
1408                 sfs->os_ffree = ost_ffree;
1409         }
1410
1411         /* a single successful statfs should be enough */
1412         rc = 0;
1413
1414 out:
1415         RETURN(rc);
1416 }
1417
1418 /**
1419  * Implementation of dt_device_operations::dt_trans_create() for LOD
1420  *
1421  * Creates a transaction using local (to this node) OSD.
1422  *
1423  * see include/dt_object.h for the details.
1424  */
1425 static struct thandle *lod_trans_create(const struct lu_env *env,
1426                                         struct dt_device *dt)
1427 {
1428         struct thandle *th;
1429
1430         th = top_trans_create(env, dt2lod_dev(dt)->lod_child);
1431         if (IS_ERR(th))
1432                 return th;
1433
1434         th->th_dev = dt;
1435
1436         return th;
1437 }
1438
1439 /**
1440  * Implementation of dt_device_operations::dt_trans_start() for LOD
1441  *
1442  * Starts the set of local transactions using the targets involved
1443  * in declare phase. Initial support for the distributed transactions.
1444  *
1445  * see include/dt_object.h for the details.
1446  */
1447 static int lod_trans_start(const struct lu_env *env, struct dt_device *dt,
1448                            struct thandle *th)
1449 {
1450         return top_trans_start(env, dt2lod_dev(dt)->lod_child, th);
1451 }
1452
1453 static int lod_trans_cb_add(struct thandle *th,
1454                             struct dt_txn_commit_cb *dcb)
1455 {
1456         struct top_thandle      *top_th = container_of(th, struct top_thandle,
1457                                                        tt_super);
1458         return dt_trans_cb_add(top_th->tt_master_sub_thandle, dcb);
1459 }
1460
1461 /**
1462  * add noop update to the update records
1463  *
1464  * Add noop updates to the update records, which is only used in
1465  * test right now.
1466  *
1467  * \param[in] env       execution environment
1468  * \param[in] dt        dt device of lod
1469  * \param[in] th        thandle
1470  * \param[in] count     the count of update records to be added.
1471  *
1472  * \retval              0 if adding succeeds.
1473  * \retval              negative errno if adding fails.
1474  */
1475 static int lod_add_noop_records(const struct lu_env *env,
1476                                 struct dt_device *dt, struct thandle *th,
1477                                 int count)
1478 {
1479         struct top_thandle *top_th;
1480         struct lu_fid *fid = &lod_env_info(env)->lti_fid;
1481         int i;
1482         int rc = 0;
1483
1484         top_th = container_of(th, struct top_thandle, tt_super);
1485         if (!top_th->tt_multiple_thandle)
1486                 return 0;
1487
1488         fid_zero(fid);
1489         for (i = 0; i < count; i++) {
1490                 rc = update_record_pack(noop, th, fid);
1491                 if (rc < 0)
1492                         return rc;
1493         }
1494         return rc;
1495 }
1496
1497 /**
1498  * Implementation of dt_device_operations::dt_trans_stop() for LOD
1499  *
1500  * Stops the set of local transactions using the targets involved
1501  * in declare phase. Initial support for the distributed transactions.
1502  *
1503  * see include/dt_object.h for the details.
1504  */
1505 static int lod_trans_stop(const struct lu_env *env, struct dt_device *dt,
1506                           struct thandle *th)
1507 {
1508         if (OBD_FAIL_CHECK(OBD_FAIL_SPLIT_UPDATE_REC)) {
1509                 int rc;
1510
1511                 rc = lod_add_noop_records(env, dt, th, 5000);
1512                 if (rc < 0)
1513                         RETURN(rc);
1514         }
1515         return top_trans_stop(env, dt2lod_dev(dt)->lod_child, th);
1516 }
1517
1518 /**
1519  * Implementation of dt_device_operations::dt_conf_get() for LOD
1520  *
1521  * Currently returns the configuration provided by the local OSD.
1522  *
1523  * see include/dt_object.h for the details.
1524  */
1525 static void lod_conf_get(const struct lu_env *env,
1526                          const struct dt_device *dev,
1527                          struct dt_device_param *param)
1528 {
1529         dt_conf_get(env, dt2lod_dev((struct dt_device *)dev)->lod_child, param);
1530 }
1531
1532 /**
1533  * Implementation of dt_device_operations::dt_sync() for LOD
1534  *
1535  * Syncs all known OST targets. Very very expensive and used
1536  * rarely by LFSCK now. Should not be used in general.
1537  *
1538  * see include/dt_object.h for the details.
1539  */
1540 static int lod_sync(const struct lu_env *env, struct dt_device *dev)
1541 {
1542         struct lod_device *lod = dt2lod_dev(dev);
1543         struct lu_tgt_desc *tgt;
1544         int rc = 0;
1545
1546         ENTRY;
1547
1548         lod_getref(&lod->lod_ost_descs);
1549         lod_foreach_ost(lod, tgt) {
1550                 if (!tgt->ltd_active)
1551                         continue;
1552                 rc = dt_sync(env, tgt->ltd_tgt);
1553                 if (rc) {
1554                         if (rc != -ENOTCONN) {
1555                                 CERROR("%s: can't sync ost %u: rc = %d\n",
1556                                        lod2obd(lod)->obd_name, tgt->ltd_index,
1557                                        rc);
1558                                 break;
1559                         }
1560                         rc = 0;
1561                 }
1562         }
1563         lod_putref(lod, &lod->lod_ost_descs);
1564
1565         if (rc)
1566                 RETURN(rc);
1567
1568         lod_getref(&lod->lod_mdt_descs);
1569         lod_foreach_mdt(lod, tgt) {
1570                 if (!tgt->ltd_active)
1571                         continue;
1572                 rc = dt_sync(env, tgt->ltd_tgt);
1573                 if (rc) {
1574                         if (rc != -ENOTCONN) {
1575                                 CERROR("%s: can't sync mdt %u: rc = %d\n",
1576                                        lod2obd(lod)->obd_name, tgt->ltd_index,
1577                                        rc);
1578                                 break;
1579                         }
1580                         rc = 0;
1581                 }
1582         }
1583         lod_putref(lod, &lod->lod_mdt_descs);
1584
1585         if (rc == 0)
1586                 rc = dt_sync(env, lod->lod_child);
1587
1588         RETURN(rc);
1589 }
1590
1591 /**
1592  * Implementation of dt_device_operations::dt_ro() for LOD
1593  *
1594  * Turns local OSD read-only, used for the testing only.
1595  *
1596  * see include/dt_object.h for the details.
1597  */
1598 static int lod_ro(const struct lu_env *env, struct dt_device *dev)
1599 {
1600         return dt_ro(env, dt2lod_dev(dev)->lod_child);
1601 }
1602
1603 /**
1604  * Implementation of dt_device_operations::dt_commit_async() for LOD
1605  *
1606  * Asks local OSD to commit sooner.
1607  *
1608  * see include/dt_object.h for the details.
1609  */
1610 static int lod_commit_async(const struct lu_env *env, struct dt_device *dev)
1611 {
1612         return dt_commit_async(env, dt2lod_dev(dev)->lod_child);
1613 }
1614
1615 static const struct dt_device_operations lod_dt_ops = {
1616         .dt_root_get         = lod_root_get,
1617         .dt_statfs           = lod_statfs,
1618         .dt_trans_create     = lod_trans_create,
1619         .dt_trans_start      = lod_trans_start,
1620         .dt_trans_stop       = lod_trans_stop,
1621         .dt_conf_get         = lod_conf_get,
1622         .dt_sync             = lod_sync,
1623         .dt_ro               = lod_ro,
1624         .dt_commit_async     = lod_commit_async,
1625         .dt_trans_cb_add     = lod_trans_cb_add,
1626 };
1627
1628 /**
1629  * Connect to a local OSD.
1630  *
1631  * Used to connect to the local OSD at mount. OSD name is taken from the
1632  * configuration command passed. This connection is used to identify LU
1633  * site and pin the OSD from early removal.
1634  *
1635  * \param[in] env               LU environment provided by the caller
1636  * \param[in] lod               lod device
1637  * \param[in] cfg               configuration command to apply
1638  *
1639  * \retval 0                    on success
1640  * \retval negative             negated errno on error
1641  **/
1642 static int lod_connect_to_osd(const struct lu_env *env, struct lod_device *lod,
1643                               struct lustre_cfg *cfg)
1644 {
1645         struct obd_connect_data *data = NULL;
1646         struct obd_device *obd;
1647         char *nextdev = NULL, *p, *s;
1648         int rc, len = 0;
1649
1650         ENTRY;
1651
1652         LASSERT(cfg);
1653         LASSERT(lod->lod_child_exp == NULL);
1654
1655         /*
1656          * compatibility hack: we still use old config logs
1657          * which specify LOV, but we need to learn underlying
1658          * OSD device, which is supposed to be:
1659          *  <fsname>-MDTxxxx-osd
1660          *
1661          * 2.x MGS generates lines like the following:
1662          *   #03 (176)lov_setup 0:lustre-MDT0000-mdtlov  1:(struct lov_desc)
1663          * 1.8 MGS generates lines like the following:
1664          *   #03 (168)lov_setup 0:lustre-mdtlov  1:(struct lov_desc)
1665          *
1666          * we use "-MDT" to differentiate 2.x from 1.8
1667          */
1668         p = lustre_cfg_string(cfg, 0);
1669         if (p && strstr(p, "-mdtlov")) {
1670                 len = strlen(p) + 6;
1671                 OBD_ALLOC(nextdev, len);
1672                 if (!nextdev)
1673                         GOTO(out, rc = -ENOMEM);
1674
1675                 strcpy(nextdev, p);
1676                 s = strstr(nextdev, "-mdtlov");
1677                 if (unlikely(!s)) {
1678                         CERROR("%s: unable to parse device name: rc = %d\n",
1679                                lustre_cfg_string(cfg, 0), -EINVAL);
1680                         GOTO(out, rc = -EINVAL);
1681                 }
1682
1683                 if (strstr(nextdev, "-MDT")) {
1684                         /* 2.x config */
1685                         strcpy(s, "-osd");
1686                 } else {
1687                         /* 1.8 config */
1688                         strcpy(s, "-MDT0000-osd");
1689                 }
1690         } else {
1691                 CERROR("%s: unable to parse device name: rc = %d\n",
1692                        lustre_cfg_string(cfg, 0), -EINVAL);
1693                 GOTO(out, rc = -EINVAL);
1694         }
1695
1696         OBD_ALLOC_PTR(data);
1697         if (!data)
1698                 GOTO(out, rc = -ENOMEM);
1699
1700         obd = class_name2obd(nextdev);
1701         if (!obd) {
1702                 CERROR("%s: can not locate next device: rc = %d\n",
1703                        nextdev, -ENOTCONN);
1704                 GOTO(out, rc = -ENOTCONN);
1705         }
1706
1707         data->ocd_connect_flags = OBD_CONNECT_VERSION;
1708         data->ocd_version = LUSTRE_VERSION_CODE;
1709
1710         rc = obd_connect(env, &lod->lod_child_exp, obd, &obd->obd_uuid,
1711                          data, NULL);
1712         if (rc) {
1713                 CERROR("%s: cannot connect to next dev: rc = %d\n",
1714                        nextdev, rc);
1715                 GOTO(out, rc);
1716         }
1717
1718         lod->lod_dt_dev.dd_lu_dev.ld_site =
1719                 lod->lod_child_exp->exp_obd->obd_lu_dev->ld_site;
1720         LASSERT(lod->lod_dt_dev.dd_lu_dev.ld_site);
1721         lod->lod_child = lu2dt_dev(lod->lod_child_exp->exp_obd->obd_lu_dev);
1722
1723 out:
1724         if (data)
1725                 OBD_FREE_PTR(data);
1726         if (nextdev)
1727                 OBD_FREE(nextdev, len);
1728         RETURN(rc);
1729 }
1730
1731 static int lod_lsfs_init(const struct lu_env *env, struct lod_device *d)
1732 {
1733         struct obd_statfs sfs;
1734         int rc;
1735
1736         rc = dt_statfs(env, d->lod_child, &sfs);
1737         if (rc) {
1738                 CDEBUG(D_LAYOUT, "%s: failed to get OSD statfs, rc = %d\n",
1739                        lod2obd(d)->obd_name, rc);
1740                 return rc;
1741         }
1742
1743         /* udpate local OSD cached statfs data */
1744         spin_lock_init(&d->lod_lsfs_lock);
1745         d->lod_lsfs_age = ktime_get_seconds();
1746         d->lod_lsfs_total_mb = (sfs.os_blocks * sfs.os_bsize) >> 20;
1747         d->lod_lsfs_free_mb = (sfs.os_bfree * sfs.os_bsize) >> 20;
1748         return 0;
1749 }
1750
1751 /**
1752  * Initialize LOD device at setup.
1753  *
1754  * Initializes the given LOD device using the original configuration command.
1755  * The function initiates a connection to the local OSD and initializes few
1756  * internal structures like pools, target tables, etc.
1757  *
1758  * \param[in] env               LU environment provided by the caller
1759  * \param[in] lod               lod device
1760  * \param[in] ldt               not used
1761  * \param[in] cfg               configuration command
1762  *
1763  * \retval 0                    on success
1764  * \retval negative             negated errno on error
1765  **/
1766 static int lod_init0(const struct lu_env *env, struct lod_device *lod,
1767                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
1768 {
1769         struct dt_device_param ddp;
1770         struct obd_device *obd;
1771         int rc;
1772
1773         ENTRY;
1774
1775         obd = class_name2obd(lustre_cfg_string(cfg, 0));
1776         if (!obd) {
1777                 rc = -ENODEV;
1778                 CERROR("Cannot find obd with name '%s': rc = %d\n",
1779                        lustre_cfg_string(cfg, 0), rc);
1780                 RETURN(rc);
1781         }
1782
1783         obd->obd_lu_dev = &lod->lod_dt_dev.dd_lu_dev;
1784         lod->lod_dt_dev.dd_lu_dev.ld_obd = obd;
1785         lod->lod_dt_dev.dd_lu_dev.ld_ops = &lod_lu_ops;
1786         lod->lod_dt_dev.dd_ops = &lod_dt_ops;
1787
1788         rc = lod_connect_to_osd(env, lod, cfg);
1789         if (rc)
1790                 RETURN(rc);
1791
1792         dt_conf_get(env, &lod->lod_dt_dev, &ddp);
1793         lod->lod_osd_max_easize = ddp.ddp_max_ea_size;
1794         lod->lod_dom_stripesize_max_kb = (1ULL << 10); /* 1Mb is default */
1795
1796         /* initialize local statfs cached values */
1797         rc = lod_lsfs_init(env, lod);
1798         if (rc)
1799                 GOTO(out_disconnect, rc);
1800
1801         /* default threshold as half of total space, in MiB */
1802         lod->lod_dom_threshold_free_mb = lod->lod_lsfs_total_mb / 2;
1803         /* set default DoM stripe size based on free space amount */
1804         lod_dom_stripesize_recalc(lod);
1805
1806         /* setup obd to be used with old lov code */
1807         rc = lod_pools_init(lod, cfg);
1808         if (rc)
1809                 GOTO(out_disconnect, rc);
1810
1811         rc = lod_procfs_init(lod);
1812         if (rc)
1813                 GOTO(out_pools, rc);
1814
1815         spin_lock_init(&lod->lod_lock);
1816         spin_lock_init(&lod->lod_connects_lock);
1817         lu_tgt_descs_init(&lod->lod_mdt_descs, true);
1818         lu_tgt_descs_init(&lod->lod_ost_descs, false);
1819
1820         RETURN(0);
1821
1822 out_pools:
1823         lod_pools_fini(lod);
1824 out_disconnect:
1825         obd_disconnect(lod->lod_child_exp);
1826         RETURN(rc);
1827 }
1828
1829 /**
1830  * Implementation of lu_device_type_operations::ldto_device_free() for LOD
1831  *
1832  * Releases the memory allocated for LOD device.
1833  *
1834  * see include/lu_object.h for the details.
1835  */
1836 static struct lu_device *lod_device_free(const struct lu_env *env,
1837                                          struct lu_device *lu)
1838 {
1839         struct lod_device *lod = lu2lod_dev(lu);
1840         struct lu_device  *next = &lod->lod_child->dd_lu_dev;
1841
1842         ENTRY;
1843
1844         if (atomic_read(&lu->ld_ref) > 0 &&
1845             !cfs_hash_is_empty(lu->ld_site->ls_obj_hash)) {
1846                 LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
1847                 lu_site_print(env, lu->ld_site, &msgdata, lu_cdebug_printer);
1848         }
1849         LASSERTF(atomic_read(&lu->ld_ref) == 0, "lu is %p\n", lu);
1850         dt_device_fini(&lod->lod_dt_dev);
1851         OBD_FREE_PTR(lod);
1852         RETURN(next);
1853 }
1854
1855 /**
1856  * Implementation of lu_device_type_operations::ldto_device_alloc() for LOD
1857  *
1858  * Allocates LOD device and calls the helpers to initialize it.
1859  *
1860  * see include/lu_object.h for the details.
1861  */
1862 static struct lu_device *lod_device_alloc(const struct lu_env *env,
1863                                           struct lu_device_type *type,
1864                                           struct lustre_cfg *lcfg)
1865 {
1866         struct lod_device *lod;
1867         struct lu_device *lu_dev;
1868
1869         OBD_ALLOC_PTR(lod);
1870         if (!lod) {
1871                 lu_dev = ERR_PTR(-ENOMEM);
1872         } else {
1873                 int rc;
1874
1875                 lu_dev = lod2lu_dev(lod);
1876                 dt_device_init(&lod->lod_dt_dev, type);
1877                 rc = lod_init0(env, lod, type, lcfg);
1878                 if (rc != 0) {
1879                         lod_device_free(env, lu_dev);
1880                         lu_dev = ERR_PTR(rc);
1881                 }
1882         }
1883
1884         return lu_dev;
1885 }
1886
1887 static void lod_avoid_guide_fini(struct lod_avoid_guide *lag)
1888 {
1889         if (lag->lag_oss_avoid_array)
1890                 OBD_FREE_PTR_ARRAY(lag->lag_oss_avoid_array,
1891                                    lag->lag_oaa_size);
1892         if (lag->lag_ost_avoid_bitmap)
1893                 CFS_FREE_BITMAP(lag->lag_ost_avoid_bitmap);
1894 }
1895
1896 /**
1897  * Implementation of lu_device_type_operations::ldto_device_fini() for LOD
1898  *
1899  * Releases the internal resources used by LOD device.
1900  *
1901  * see include/lu_object.h for the details.
1902  */
1903 static struct lu_device *lod_device_fini(const struct lu_env *env,
1904                                          struct lu_device *d)
1905 {
1906         struct lod_device *lod = lu2lod_dev(d);
1907         int rc;
1908
1909         ENTRY;
1910
1911         lod_pools_fini(lod);
1912
1913         lod_procfs_fini(lod);
1914
1915         rc = lod_fini_tgt(env, lod, &lod->lod_ost_descs);
1916         if (rc)
1917                 CERROR("%s: can not fini ost descriptors: rc =  %d\n",
1918                         lod2obd(lod)->obd_name, rc);
1919
1920         rc = lod_fini_tgt(env, lod, &lod->lod_mdt_descs);
1921         if (rc)
1922                 CERROR("%s: can not fini mdt descriptors: rc =  %d\n",
1923                         lod2obd(lod)->obd_name, rc);
1924
1925         RETURN(NULL);
1926 }
1927
1928 /**
1929  * Implementation of obd_ops::o_connect() for LOD
1930  *
1931  * Used to track all the users of this specific LOD device,
1932  * so the device stays up until the last user disconnected.
1933  *
1934  * \param[in] env               LU environment provided by the caller
1935  * \param[out] exp              export the caller will be using to access LOD
1936  * \param[in] obd               OBD device representing LOD device
1937  * \param[in] cluuid            unique identifier of the caller
1938  * \param[in] data              not used
1939  * \param[in] localdata         not used
1940  *
1941  * \retval 0                    on success
1942  * \retval negative             negated errno on error
1943  **/
1944 static int lod_obd_connect(const struct lu_env *env, struct obd_export **exp,
1945                            struct obd_device *obd, struct obd_uuid *cluuid,
1946                            struct obd_connect_data *data, void *localdata)
1947 {
1948         struct lod_device *lod = lu2lod_dev(obd->obd_lu_dev);
1949         struct lustre_handle conn;
1950         int rc;
1951
1952         ENTRY;
1953
1954         CDEBUG(D_CONFIG, "connect #%d\n", lod->lod_connects);
1955
1956         rc = class_connect(&conn, obd, cluuid);
1957         if (rc)
1958                 RETURN(rc);
1959
1960         *exp = class_conn2export(&conn);
1961
1962         spin_lock(&lod->lod_connects_lock);
1963         lod->lod_connects++;
1964         /* at the moment we expect the only user */
1965         LASSERT(lod->lod_connects == 1);
1966         spin_unlock(&lod->lod_connects_lock);
1967
1968         RETURN(0);
1969 }
1970
1971 /**
1972  *
1973  * Implementation of obd_ops::o_disconnect() for LOD
1974  *
1975  * When the caller doesn't need to use this LOD instance, it calls
1976  * obd_disconnect() and LOD releases corresponding export/reference count.
1977  * Once all the users gone, LOD device is released.
1978  *
1979  * \param[in] exp               export provided to the caller in obd_connect()
1980  *
1981  * \retval 0                    on success
1982  * \retval negative             negated errno on error
1983  **/
1984 static int lod_obd_disconnect(struct obd_export *exp)
1985 {
1986         struct obd_device *obd = exp->exp_obd;
1987         struct lod_device *lod = lu2lod_dev(obd->obd_lu_dev);
1988         int rc, release = 0;
1989
1990         ENTRY;
1991
1992         /* Only disconnect the underlying layers on the final disconnect. */
1993         spin_lock(&lod->lod_connects_lock);
1994         lod->lod_connects--;
1995         if (lod->lod_connects != 0) {
1996                 /* why should there be more than 1 connect? */
1997                 spin_unlock(&lod->lod_connects_lock);
1998                 CERROR("%s: disconnect #%d\n", exp->exp_obd->obd_name,
1999                        lod->lod_connects);
2000                 goto out;
2001         }
2002         spin_unlock(&lod->lod_connects_lock);
2003
2004         /* the last user of lod has gone, let's release the device */
2005         release = 1;
2006
2007 out:
2008         rc = class_disconnect(exp); /* bz 9811 */
2009
2010         if (rc == 0 && release)
2011                 class_manual_cleanup(obd);
2012         RETURN(rc);
2013 }
2014
2015 LU_KEY_INIT(lod, struct lod_thread_info);
2016
2017 static void lod_key_fini(const struct lu_context *ctx,
2018                 struct lu_context_key *key, void *data)
2019 {
2020         struct lod_thread_info *info = data;
2021         struct lod_layout_component *lds =
2022                                 info->lti_def_striping.lds_def_comp_entries;
2023
2024         /*
2025          * allocated in lod_get_lov_ea
2026          * XXX: this is overload, a tread may have such store but used only
2027          * once. Probably better would be pool of such stores per LOD.
2028          */
2029         if (info->lti_ea_store) {
2030                 OBD_FREE_LARGE(info->lti_ea_store, info->lti_ea_store_size);
2031                 info->lti_ea_store = NULL;
2032                 info->lti_ea_store_size = 0;
2033         }
2034         lu_buf_free(&info->lti_linkea_buf);
2035
2036         if (lds)
2037                 lod_free_def_comp_entries(&info->lti_def_striping);
2038
2039         if (info->lti_comp_size > 0)
2040                 OBD_FREE_PTR_ARRAY(info->lti_comp_idx,
2041                                    info->lti_comp_size);
2042
2043         lod_avoid_guide_fini(&info->lti_avoid);
2044
2045         OBD_FREE_PTR(info);
2046 }
2047
2048 /* context key: lod_thread_key */
2049 LU_CONTEXT_KEY_DEFINE(lod, LCT_MD_THREAD);
2050
2051 LU_TYPE_INIT_FINI(lod, &lod_thread_key);
2052
2053 static struct lu_device_type_operations lod_device_type_ops = {
2054         .ldto_init           = lod_type_init,
2055         .ldto_fini           = lod_type_fini,
2056
2057         .ldto_start          = lod_type_start,
2058         .ldto_stop           = lod_type_stop,
2059
2060         .ldto_device_alloc   = lod_device_alloc,
2061         .ldto_device_free    = lod_device_free,
2062
2063         .ldto_device_fini    = lod_device_fini
2064 };
2065
2066 static struct lu_device_type lod_device_type = {
2067         .ldt_tags     = LU_DEVICE_DT,
2068         .ldt_name     = LUSTRE_LOD_NAME,
2069         .ldt_ops      = &lod_device_type_ops,
2070         .ldt_ctx_tags = LCT_MD_THREAD,
2071 };
2072
2073 /**
2074  * Implementation of obd_ops::o_get_info() for LOD
2075  *
2076  * Currently, there is only one supported key: KEY_OSP_CONNECTED , to provide
2077  * the caller binary status whether LOD has seen connection to any OST target.
2078  * It will also check if the MDT update log context being initialized (if
2079  * needed).
2080  *
2081  * \param[in] env               LU environment provided by the caller
2082  * \param[in] exp               export of the caller
2083  * \param[in] keylen            len of the key
2084  * \param[in] key               the key
2085  * \param[in] vallen            not used
2086  * \param[in] val               not used
2087  *
2088  * \retval                      0 if a connection was seen
2089  * \retval                      -EAGAIN if LOD isn't running yet or no
2090  *                              connection has been seen yet
2091  * \retval                      -EINVAL if not supported key is requested
2092  **/
2093 static int lod_obd_get_info(const struct lu_env *env, struct obd_export *exp,
2094                             u32 keylen, void *key, u32 *vallen, void *val)
2095 {
2096         int rc = -EINVAL;
2097
2098         if (KEY_IS(KEY_OSP_CONNECTED)) {
2099                 struct obd_device *obd = exp->exp_obd;
2100                 struct lod_device *d;
2101                 struct lod_tgt_desc *tgt;
2102                 int rc = 1;
2103
2104                 if (!obd->obd_set_up || obd->obd_stopping)
2105                         RETURN(-EAGAIN);
2106
2107                 d = lu2lod_dev(obd->obd_lu_dev);
2108                 lod_getref(&d->lod_ost_descs);
2109                 lod_foreach_ost(d, tgt) {
2110                         rc = obd_get_info(env, tgt->ltd_exp, keylen, key,
2111                                           vallen, val);
2112                         /* one healthy device is enough */
2113                         if (rc == 0)
2114                                 break;
2115                 }
2116                 lod_putref(d, &d->lod_ost_descs);
2117
2118                 lod_getref(&d->lod_mdt_descs);
2119                 lod_foreach_mdt(d, tgt) {
2120                         struct llog_ctxt *ctxt;
2121
2122                         if (!tgt->ltd_active)
2123                                 continue;
2124
2125                         ctxt = llog_get_context(tgt->ltd_tgt->dd_lu_dev.ld_obd,
2126                                                 LLOG_UPDATELOG_ORIG_CTXT);
2127                         if (!ctxt) {
2128                                 CDEBUG(D_INFO, "%s: %s is not ready.\n",
2129                                        obd->obd_name,
2130                                       tgt->ltd_tgt->dd_lu_dev.ld_obd->obd_name);
2131                                 rc = -EAGAIN;
2132                                 break;
2133                         }
2134                         if (!ctxt->loc_handle) {
2135                                 CDEBUG(D_INFO, "%s: %s is not ready.\n",
2136                                        obd->obd_name,
2137                                       tgt->ltd_tgt->dd_lu_dev.ld_obd->obd_name);
2138                                 rc = -EAGAIN;
2139                                 llog_ctxt_put(ctxt);
2140                                 break;
2141                         }
2142                         llog_ctxt_put(ctxt);
2143                 }
2144                 lod_putref(d, &d->lod_mdt_descs);
2145
2146                 RETURN(rc);
2147         }
2148
2149         RETURN(rc);
2150 }
2151
2152 static int lod_obd_set_info_async(const struct lu_env *env,
2153                                   struct obd_export *exp,
2154                                   u32 keylen, void *key,
2155                                   u32 vallen, void *val,
2156                                   struct ptlrpc_request_set *set)
2157 {
2158         struct obd_device *obd = class_exp2obd(exp);
2159         struct lod_device *d;
2160         struct lod_tgt_desc *tgt;
2161         int no_set = 0;
2162         int rc = 0, rc2;
2163
2164         ENTRY;
2165
2166         if (!set) {
2167                 no_set = 1;
2168                 set = ptlrpc_prep_set();
2169                 if (!set)
2170                         RETURN(-ENOMEM);
2171         }
2172
2173         d = lu2lod_dev(obd->obd_lu_dev);
2174         lod_getref(&d->lod_ost_descs);
2175         lod_foreach_ost(d, tgt) {
2176                 if (!tgt->ltd_active)
2177                         continue;
2178
2179                 rc2 = obd_set_info_async(env, tgt->ltd_exp, keylen, key,
2180                                          vallen, val, set);
2181                 if (rc2 != 0 && rc == 0)
2182                         rc = rc2;
2183         }
2184         lod_putref(d, &d->lod_ost_descs);
2185
2186         lod_getref(&d->lod_mdt_descs);
2187         lod_foreach_mdt(d, tgt) {
2188                 if (!tgt->ltd_active)
2189                         continue;
2190                 rc2 = obd_set_info_async(env, tgt->ltd_exp, keylen, key,
2191                                          vallen, val, set);
2192                 if (rc2 != 0 && rc == 0)
2193                         rc = rc2;
2194         }
2195         lod_putref(d, &d->lod_mdt_descs);
2196
2197
2198         if (no_set) {
2199                 rc2 = ptlrpc_set_wait(env, set);
2200                 if (rc2 == 0 && rc == 0)
2201                         rc = rc2;
2202                 ptlrpc_set_destroy(set);
2203         }
2204         RETURN(rc);
2205 }
2206
2207 static const struct obd_ops lod_obd_device_ops = {
2208         .o_owner        = THIS_MODULE,
2209         .o_connect      = lod_obd_connect,
2210         .o_disconnect   = lod_obd_disconnect,
2211         .o_get_info     = lod_obd_get_info,
2212         .o_set_info_async = lod_obd_set_info_async,
2213         .o_pool_new     = lod_pool_new,
2214         .o_pool_rem     = lod_pool_remove,
2215         .o_pool_add     = lod_pool_add,
2216         .o_pool_del     = lod_pool_del,
2217 };
2218
2219 static int __init lod_init(void)
2220 {
2221         struct obd_type *sym;
2222         int rc;
2223
2224         rc = lu_kmem_init(lod_caches);
2225         if (rc)
2226                 return rc;
2227
2228         rc = class_register_type(&lod_obd_device_ops, NULL, true, NULL,
2229                                  LUSTRE_LOD_NAME, &lod_device_type);
2230         if (rc) {
2231                 lu_kmem_fini(lod_caches);
2232                 return rc;
2233         }
2234
2235         /* create "lov" entry for compatibility purposes */
2236         sym = class_add_symlinks(LUSTRE_LOV_NAME, true);
2237         if (IS_ERR(sym)) {
2238                 rc = PTR_ERR(sym);
2239                 /* does real "lov" already exist ? */
2240                 if (rc == -EEXIST)
2241                         rc = 0;
2242         }
2243
2244         return rc;
2245 }
2246
2247 static void __exit lod_exit(void)
2248 {
2249         struct obd_type *sym = class_search_type(LUSTRE_LOV_NAME);
2250
2251         /* if this was never fully initialized by the lov layer
2252          * then we are responsible for freeing this obd_type
2253          */
2254         if (sym) {
2255                 /* final put if we manage this obd type */
2256                 if (sym->typ_sym_filter)
2257                         kobject_put(&sym->typ_kobj);
2258                 /* put reference taken by class_search_type */
2259                 kobject_put(&sym->typ_kobj);
2260         }
2261
2262         class_unregister_type(LUSTRE_LOD_NAME);
2263         lu_kmem_fini(lod_caches);
2264 }
2265
2266 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2267 MODULE_DESCRIPTION("Lustre Logical Object Device ("LUSTRE_LOD_NAME")");
2268 MODULE_VERSION(LUSTRE_VERSION_STRING);
2269 MODULE_LICENSE("GPL");
2270
2271 module_init(lod_init);
2272 module_exit(lod_exit);