Whamcloud - gitweb
LU-8066 obd: collect all resource releasing for obj_type.
[fs/lustre-release.git] / lustre / lod / lod_dev.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/lod/lod_dev.c
33  *
34  * Lustre Logical Object Device
35  *
36  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
37  * Author: Mikhail Pershin <mike.pershin@intel.com>
38  */
39 /**
40  * The Logical Object Device (LOD) layer manages access to striped
41  * objects (both regular files and directories). It implements the DT
42  * device and object APIs and is responsible for creating, storing,
43  * and loading striping information as an extended attribute of the
44  * underlying OSD object. LOD is the server side analog of the LOV and
45  * LMV layers on the client side.
46  *
47  * Metadata LU object stack (layers of the same compound LU object,
48  * all have the same FID):
49  *
50  *        MDT
51  *         |      MD API
52  *        MDD
53  *         |      DT API
54  *        LOD
55  *       /   \    DT API
56  *     OSD   OSP
57  *
58  * During LOD object initialization the localness or remoteness of the
59  * object FID dictates the choice between OSD and OSP.
60  *
61  * An LOD object (file or directory) with N stripes (each has a
62  * different FID):
63  *
64  *          LOD
65  *           |
66  *   +---+---+---+...+
67  *   |   |   |   |   |
68  *   S0  S1  S2  S3  S(N-1)  OS[DP] objects, seen as DT objects by LOD
69  *
70  * When upper layers must access an object's stripes (which are
71  * themselves OST or MDT LU objects) LOD finds these objects by their
72  * FIDs and stores them as an array of DT object pointers on the
73  * object. Declarations and operations on LOD objects are received by
74  * LOD (as DT object operations) and performed on the underlying
75  * OS[DP] object and (as needed) on the stripes. From the perspective
76  * of LOD, a stripe-less file (created by mknod() or open with
77  * O_LOV_DELAY_CREATE) is an object which does not yet have stripes,
78  * while a non-striped directory (created by mkdir()) is an object
79  * which will never have stripes.
80  *
81  * The LOD layer also implements a small subset of the OBD device API
82  * to support MDT stack initialization and finalization (an MDD device
83  * connects and disconnects itself to and from the underlying LOD
84  * device), and pool management. In turn LOD uses the OBD device API
85  * to connect it self to the underlying OSD, and to connect itself to
86  * OSP devices representing the MDTs and OSTs that bear the stripes of
87  * its objects.
88  */
89
90 #define DEBUG_SUBSYSTEM S_MDS
91
92 #include <linux/kthread.h>
93 #include <obd_class.h>
94 #include <md_object.h>
95 #include <lustre_fid.h>
96 #include <uapi/linux/lustre/lustre_param.h>
97 #include <lustre_update.h>
98 #include <lustre_log.h>
99
100 #include "lod_internal.h"
101
102 static const char lod_update_log_name[] = "update_log";
103 static const char lod_update_log_dir_name[] = "update_log_dir";
104
105 /*
106  * Lookup target by FID.
107  *
108  * Lookup MDT/OST target index by FID. Type of the target can be
109  * specific or any.
110  *
111  * \param[in] env               LU environment provided by the caller
112  * \param[in] lod               lod device
113  * \param[in] fid               FID
114  * \param[out] tgt              result target index
115  * \param[in] type              expected type of the target:
116  *                              LU_SEQ_RANGE_{MDT,OST,ANY}
117  *
118  * \retval 0                    on success
119  * \retval negative             negated errno on error
120  **/
121 int lod_fld_lookup(const struct lu_env *env, struct lod_device *lod,
122                    const struct lu_fid *fid, u32 *tgt, int *type)
123 {
124         struct lu_seq_range range = { 0 };
125         struct lu_server_fld *server_fld;
126         int rc;
127
128         ENTRY;
129
130         if (!fid_is_sane(fid)) {
131                 CERROR("%s: invalid FID "DFID"\n", lod2obd(lod)->obd_name,
132                        PFID(fid));
133                 RETURN(-EIO);
134         }
135
136         if (fid_is_idif(fid)) {
137                 *tgt = fid_idif_ost_idx(fid);
138                 *type = LU_SEQ_RANGE_OST;
139                 RETURN(0);
140         }
141
142         if (fid_is_update_log(fid) || fid_is_update_log_dir(fid)) {
143                 *tgt = fid_oid(fid);
144                 *type = LU_SEQ_RANGE_MDT;
145                 RETURN(0);
146         }
147
148         if (!lod->lod_initialized || (!fid_seq_in_fldb(fid_seq(fid)))) {
149                 LASSERT(lu_site2seq(lod2lu_dev(lod)->ld_site) != NULL);
150
151                 *tgt = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
152                 *type = LU_SEQ_RANGE_MDT;
153                 RETURN(0);
154         }
155
156         server_fld = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_server_fld;
157         if (!server_fld)
158                 RETURN(-EIO);
159
160         fld_range_set_type(&range, *type);
161         rc = fld_server_lookup(env, server_fld, fid_seq(fid), &range);
162         if (rc != 0)
163                 RETURN(rc);
164
165         *tgt = range.lsr_index;
166         *type = range.lsr_flags;
167
168         CDEBUG(D_INFO, "%s: got tgt %x for sequence: %#llx\n",
169                lod2obd(lod)->obd_name, *tgt, fid_seq(fid));
170
171         RETURN(0);
172 }
173
174 /* Slab for OSD object allocation */
175 struct kmem_cache *lod_object_kmem;
176
177 /* Slab for dt_txn_callback */
178 struct kmem_cache *lod_txn_callback_kmem;
179 static struct lu_kmem_descr lod_caches[] = {
180         {
181                 .ckd_cache = &lod_object_kmem,
182                 .ckd_name  = "lod_obj",
183                 .ckd_size  = sizeof(struct lod_object)
184         },
185         {
186                 .ckd_cache = &lod_txn_callback_kmem,
187                 .ckd_name  = "lod_txn_callback",
188                 .ckd_size  = sizeof(struct dt_txn_callback)
189         },
190         {
191                 .ckd_cache = NULL
192         }
193 };
194
195 static struct lu_device *lod_device_fini(const struct lu_env *env,
196                                          struct lu_device *d);
197
198 /**
199  * Implementation of lu_device_operations::ldo_object_alloc() for LOD
200  *
201  * Allocates and initializes LOD's slice in the given object.
202  *
203  * see include/lu_object.h for the details.
204  */
205 static struct lu_object *lod_object_alloc(const struct lu_env *env,
206                                           const struct lu_object_header *hdr,
207                                           struct lu_device *dev)
208 {
209         struct lod_object *lod_obj;
210         struct lu_object *lu_obj;
211
212         ENTRY;
213
214         OBD_SLAB_ALLOC_PTR_GFP(lod_obj, lod_object_kmem, GFP_NOFS);
215         if (!lod_obj)
216                 RETURN(ERR_PTR(-ENOMEM));
217
218         mutex_init(&lod_obj->ldo_layout_mutex);
219         lu_obj = lod2lu_obj(lod_obj);
220         dt_object_init(&lod_obj->ldo_obj, NULL, dev);
221         lod_obj->ldo_obj.do_ops = &lod_obj_ops;
222         lu_obj->lo_ops = &lod_lu_obj_ops;
223
224         RETURN(lu_obj);
225 }
226
227 /**
228  * Process the config log for all sub device.
229  *
230  * The function goes through all the targets in the given table
231  * and apply given configuration command on to the targets.
232  * Used to cleanup the targets at unmount.
233  *
234  * \param[in] env               LU environment provided by the caller
235  * \param[in] lod               lod device
236  * \param[in] ltd               target's table to go through
237  * \param[in] lcfg              configuration command to apply
238  *
239  * \retval 0                    on success
240  * \retval negative             negated errno on error
241  **/
242 static int lod_sub_process_config(const struct lu_env *env,
243                                  struct lod_device *lod,
244                                  struct lod_tgt_descs *ltd,
245                                  struct lustre_cfg *lcfg)
246 {
247         struct lu_device  *next;
248         int rc = 0;
249         unsigned int i;
250
251         lod_getref(ltd);
252         if (ltd->ltd_tgts_size <= 0) {
253                 lod_putref(lod, ltd);
254                 return 0;
255         }
256         cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
257                 struct lod_tgt_desc *tgt;
258                 int rc1;
259
260                 tgt = LTD_TGT(ltd, i);
261                 LASSERT(tgt && tgt->ltd_tgt);
262                 next = &tgt->ltd_tgt->dd_lu_dev;
263                 rc1 = next->ld_ops->ldo_process_config(env, next, lcfg);
264                 if (rc1) {
265                         CERROR("%s: error cleaning up LOD index %u: cmd %#x : rc = %d\n",
266                                lod2obd(lod)->obd_name, i, lcfg->lcfg_command,
267                                rc1);
268                         rc = rc1;
269                 }
270         }
271         lod_putref(lod, ltd);
272         return rc;
273 }
274
275 struct lod_recovery_data {
276         struct lod_device       *lrd_lod;
277         struct lod_tgt_desc     *lrd_ltd;
278         struct ptlrpc_thread    *lrd_thread;
279         u32                     lrd_idx;
280 };
281
282
283 /**
284  * process update recovery record
285  *
286  * Add the update recovery recode to the update recovery list in
287  * lod_recovery_data. Then the recovery thread (target_recovery_thread)
288  * will redo these updates.
289  *
290  * \param[in]env        execution environment
291  * \param[in]llh        log handle of update record
292  * \param[in]rec        update record to be replayed
293  * \param[in]data       update recovery data which holds the necessary
294  *                      arguments for recovery (see struct lod_recovery_data)
295  *
296  * \retval              0 if the record is processed successfully.
297  * \retval              negative errno if the record processing fails.
298  */
299 static int lod_process_recovery_updates(const struct lu_env *env,
300                                         struct llog_handle *llh,
301                                         struct llog_rec_hdr *rec,
302                                         void *data)
303 {
304         struct lod_recovery_data *lrd = data;
305         struct llog_cookie *cookie = &lod_env_info(env)->lti_cookie;
306         struct lu_target *lut;
307         u32 index = 0;
308
309         ENTRY;
310
311         if (!lrd->lrd_ltd) {
312                 int rc;
313
314                 rc = lodname2mdt_index(lod2obd(lrd->lrd_lod)->obd_name, &index);
315                 if (rc != 0)
316                         return rc;
317         } else {
318                 index = lrd->lrd_ltd->ltd_index;
319         }
320
321         if (rec->lrh_len !=
322                 llog_update_record_size((struct llog_update_record *)rec)) {
323                 CERROR("%s: broken update record! index %u "DFID".%u: rc = %d\n",
324                        lod2obd(lrd->lrd_lod)->obd_name, index,
325                        PFID(&llh->lgh_id.lgl_oi.oi_fid), rec->lrh_index, -EIO);
326                 return -EINVAL;
327         }
328
329         cookie->lgc_lgl = llh->lgh_id;
330         cookie->lgc_index = rec->lrh_index;
331         cookie->lgc_subsys = LLOG_UPDATELOG_ORIG_CTXT;
332
333         CDEBUG(D_HA, "%s: process recovery updates "DFID".%u\n",
334                lod2obd(lrd->lrd_lod)->obd_name,
335                PFID(&llh->lgh_id.lgl_oi.oi_fid), rec->lrh_index);
336         lut = lod2lu_dev(lrd->lrd_lod)->ld_site->ls_tgt;
337
338         if (lut->lut_obd->obd_stopping ||
339             lut->lut_obd->obd_abort_recovery)
340                 return -ESHUTDOWN;
341
342         return insert_update_records_to_replay_list(lut->lut_tdtd,
343                                         (struct llog_update_record *)rec,
344                                         cookie, index);
345 }
346
347 /**
348  * recovery thread for update log
349  *
350  * Start recovery thread and prepare the sub llog, then it will retrieve
351  * the update records from the correpondent MDT and do recovery.
352  *
353  * \param[in] arg       pointer to the recovery data
354  *
355  * \retval              0 if recovery succeeds
356  * \retval              negative errno if recovery failed.
357  */
358 static int lod_sub_recovery_thread(void *arg)
359 {
360         struct lod_recovery_data *lrd = arg;
361         struct lod_device *lod = lrd->lrd_lod;
362         struct dt_device *dt;
363         struct ptlrpc_thread *thread = lrd->lrd_thread;
364         struct llog_ctxt *ctxt = NULL;
365         struct lu_env env;
366         struct lu_target *lut;
367         struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
368         struct lod_tgt_desc *tgt = NULL;
369         time64_t start;
370         int retries = 0;
371         int i;
372         int rc;
373
374         ENTRY;
375
376         thread->t_flags = SVC_RUNNING;
377         wake_up(&thread->t_ctl_waitq);
378
379         rc = lu_env_init(&env, LCT_LOCAL | LCT_MD_THREAD);
380         if (rc != 0) {
381                 OBD_FREE_PTR(lrd);
382                 CERROR("%s: can't initialize env: rc = %d\n",
383                        lod2obd(lod)->obd_name, rc);
384                 RETURN(rc);
385         }
386
387         lut = lod2lu_dev(lod)->ld_site->ls_tgt;
388         atomic_inc(&lut->lut_tdtd->tdtd_recovery_threads_count);
389         if (!lrd->lrd_ltd)
390                 dt = lod->lod_child;
391         else
392                 dt = lrd->lrd_ltd->ltd_tgt;
393
394         start = ktime_get_real_seconds();
395
396 again:
397         rc = lod_sub_prep_llog(&env, lod, dt, lrd->lrd_idx);
398         if (!rc && !lod->lod_child->dd_rdonly) {
399                 /* Process the recovery record */
400                 ctxt = llog_get_context(dt->dd_lu_dev.ld_obd,
401                                         LLOG_UPDATELOG_ORIG_CTXT);
402                 LASSERT(ctxt != NULL);
403                 LASSERT(ctxt->loc_handle != NULL);
404
405                 rc = llog_cat_process(&env, ctxt->loc_handle,
406                                       lod_process_recovery_updates, lrd, 0, 0);
407         }
408
409         if (rc < 0) {
410                 struct lu_device *top_device;
411
412                 top_device = lod->lod_dt_dev.dd_lu_dev.ld_site->ls_top_dev;
413                 /*
414                  * Because the remote target might failover at the same time,
415                  * let's retry here
416                  */
417                 if ((rc == -ETIMEDOUT || rc == -EAGAIN || rc == -EIO) &&
418                      dt != lod->lod_child &&
419                     !top_device->ld_obd->obd_abort_recovery &&
420                     !top_device->ld_obd->obd_stopping) {
421                         if (ctxt) {
422                                 if (ctxt->loc_handle)
423                                         llog_cat_close(&env,
424                                                        ctxt->loc_handle);
425                                 llog_ctxt_put(ctxt);
426                         }
427                         retries++;
428                         CDEBUG(D_HA, "%s get update log failed %d, retry\n",
429                                dt->dd_lu_dev.ld_obd->obd_name, rc);
430                         goto again;
431                 }
432
433                 CERROR("%s get update log failed: rc = %d\n",
434                        dt->dd_lu_dev.ld_obd->obd_name, rc);
435                 llog_ctxt_put(ctxt);
436
437                 spin_lock(&top_device->ld_obd->obd_dev_lock);
438                 if (!top_device->ld_obd->obd_abort_recovery &&
439                     !top_device->ld_obd->obd_stopping)
440                         top_device->ld_obd->obd_abort_recovery = 1;
441                 spin_unlock(&top_device->ld_obd->obd_dev_lock);
442
443                 GOTO(out, rc);
444         }
445         llog_ctxt_put(ctxt);
446
447         CDEBUG(D_HA, "%s retrieved update log, duration %lld, retries %d\n",
448                dt->dd_lu_dev.ld_obd->obd_name, ktime_get_real_seconds() - start,
449                retries);
450
451         spin_lock(&lod->lod_lock);
452         if (!lrd->lrd_ltd)
453                 lod->lod_child_got_update_log = 1;
454         else
455                 lrd->lrd_ltd->ltd_got_update_log = 1;
456
457         if (!lod->lod_child_got_update_log) {
458                 spin_unlock(&lod->lod_lock);
459                 GOTO(out, rc = 0);
460         }
461
462         cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
463                 tgt = LTD_TGT(ltd, i);
464                 if (!tgt->ltd_got_update_log) {
465                         spin_unlock(&lod->lod_lock);
466                         GOTO(out, rc = 0);
467                 }
468         }
469         lut->lut_tdtd->tdtd_replay_ready = 1;
470         spin_unlock(&lod->lod_lock);
471
472         CDEBUG(D_HA, "%s got update logs from all MDTs.\n",
473                lut->lut_obd->obd_name);
474         wake_up(&lut->lut_obd->obd_next_transno_waitq);
475         EXIT;
476
477 out:
478         OBD_FREE_PTR(lrd);
479         thread->t_flags = SVC_STOPPED;
480         atomic_dec(&lut->lut_tdtd->tdtd_recovery_threads_count);
481         wake_up(&lut->lut_tdtd->tdtd_recovery_threads_waitq);
482         wake_up(&thread->t_ctl_waitq);
483         lu_env_fini(&env);
484         return rc;
485 }
486
487 /**
488  * finish sub llog context
489  *
490  * Stop update recovery thread for the sub device, then cleanup the
491  * correspondent llog ctxt.
492  *
493  * \param[in] env      execution environment
494  * \param[in] lod      lod device to do update recovery
495  * \param[in] thread   recovery thread on this sub device
496  */
497 void lod_sub_fini_llog(const struct lu_env *env,
498                        struct dt_device *dt, struct ptlrpc_thread *thread)
499 {
500         struct obd_device *obd;
501         struct llog_ctxt *ctxt;
502
503         ENTRY;
504
505         obd = dt->dd_lu_dev.ld_obd;
506         CDEBUG(D_INFO, "%s: finish sub llog\n", obd->obd_name);
507         /* Stop recovery thread first */
508         if (thread && thread->t_flags & SVC_RUNNING) {
509                 thread->t_flags = SVC_STOPPING;
510                 wake_up(&thread->t_ctl_waitq);
511                 wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
512         }
513
514         ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
515         if (!ctxt)
516                 RETURN_EXIT;
517
518         if (ctxt->loc_handle)
519                 llog_cat_close(env, ctxt->loc_handle);
520
521         llog_cleanup(env, ctxt);
522
523         RETURN_EXIT;
524 }
525
526 /**
527  * Extract MDT target index from a device name.
528  *
529  * a helper function to extract index from the given device name
530  * like "fsname-MDTxxxx-mdtlov"
531  *
532  * \param[in] lodname           device name
533  * \param[out] mdt_index        extracted index
534  *
535  * \retval 0            on success
536  * \retval -EINVAL      if the name is invalid
537  */
538 int lodname2mdt_index(char *lodname, u32 *mdt_index)
539 {
540         unsigned long index;
541         char *ptr, *tmp;
542         int rc;
543
544         /* 1.8 configs don't have "-MDT0000" at the end */
545         ptr = strstr(lodname, "-MDT");
546         if (!ptr) {
547                 *mdt_index = 0;
548                 return 0;
549         }
550
551         ptr = strrchr(lodname, '-');
552         if (!ptr) {
553                 rc = -EINVAL;
554                 CERROR("invalid MDT index in '%s': rc = %d\n", lodname, rc);
555                 return rc;
556         }
557
558         if (strncmp(ptr, "-mdtlov", 7) != 0) {
559                 rc = -EINVAL;
560                 CERROR("invalid MDT index in '%s': rc = %d\n", lodname, rc);
561                 return rc;
562         }
563
564         if ((unsigned long)ptr - (unsigned long)lodname <= 8) {
565                 rc = -EINVAL;
566                 CERROR("invalid MDT index in '%s': rc = %d\n", lodname, rc);
567                 return rc;
568         }
569
570         if (strncmp(ptr - 8, "-MDT", 4) != 0) {
571                 rc = -EINVAL;
572                 CERROR("invalid MDT index in '%s': rc = %d\n", lodname, rc);
573                 return rc;
574         }
575
576         index = simple_strtol(ptr - 4, &tmp, 16);
577         if (*tmp != '-' || index > INT_MAX) {
578                 rc = -EINVAL;
579                 CERROR("invalid MDT index in '%s': rc = %d\n", lodname, rc);
580                 return rc;
581         }
582         *mdt_index = index;
583         return 0;
584 }
585
586 /**
587  * Init sub llog context
588  *
589  * Setup update llog ctxt for update recovery threads, then start the
590  * recovery thread (lod_sub_recovery_thread) to read update llog from
591  * the correspondent MDT to do update recovery.
592  *
593  * \param[in] env       execution environment
594  * \param[in] lod       lod device to do update recovery
595  * \param[in] dt        sub dt device for which the recovery thread is
596  *
597  * \retval              0 if initialization succeeds.
598  * \retval              negative errno if initialization fails.
599  */
600 int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod,
601                       struct dt_device *dt)
602 {
603         struct obd_device *obd;
604         struct lod_recovery_data *lrd = NULL;
605         struct ptlrpc_thread *thread;
606         struct task_struct *task;
607         struct l_wait_info lwi = { 0 };
608         struct lod_tgt_desc *sub_ltd = NULL;
609         u32 index;
610         u32 master_index;
611         int rc;
612
613         ENTRY;
614
615         rc = lodname2mdt_index(lod2obd(lod)->obd_name, &master_index);
616         if (rc != 0)
617                 RETURN(rc);
618
619         OBD_ALLOC_PTR(lrd);
620         if (!lrd)
621                 RETURN(-ENOMEM);
622
623         if (lod->lod_child == dt) {
624                 thread = &lod->lod_child_recovery_thread;
625                 index = master_index;
626         } else {
627                 struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
628                 struct lod_tgt_desc *tgt = NULL;
629                 unsigned int i;
630
631                 cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
632                         tgt = LTD_TGT(ltd, i);
633                         if (tgt->ltd_tgt == dt) {
634                                 index = tgt->ltd_index;
635                                 sub_ltd = tgt;
636                                 break;
637                         }
638                 }
639                 LASSERT(sub_ltd != NULL);
640                 OBD_ALLOC_PTR(sub_ltd->ltd_recovery_thread);
641                 if (!sub_ltd->ltd_recovery_thread)
642                         GOTO(free_lrd, rc = -ENOMEM);
643
644                 thread = sub_ltd->ltd_recovery_thread;
645         }
646
647         CDEBUG(D_INFO, "%s init sub log %s\n", lod2obd(lod)->obd_name,
648                dt->dd_lu_dev.ld_obd->obd_name);
649         lrd->lrd_lod = lod;
650         lrd->lrd_ltd = sub_ltd;
651         lrd->lrd_thread = thread;
652         lrd->lrd_idx = index;
653         init_waitqueue_head(&thread->t_ctl_waitq);
654
655         obd = dt->dd_lu_dev.ld_obd;
656         obd->obd_lvfs_ctxt.dt = dt;
657         rc = llog_setup(env, obd, &obd->obd_olg, LLOG_UPDATELOG_ORIG_CTXT,
658                         NULL, &llog_common_cat_ops);
659         if (rc < 0) {
660                 CERROR("%s: cannot setup updatelog llog: rc = %d\n",
661                        obd->obd_name, rc);
662                 GOTO(free_thread, rc);
663         }
664
665         /* Start the recovery thread */
666         task = kthread_run(lod_sub_recovery_thread, lrd, "lod%04x_rec%04x",
667                            master_index, index);
668         if (IS_ERR(task)) {
669                 rc = PTR_ERR(task);
670                 CERROR("%s: cannot start recovery thread: rc = %d\n",
671                        obd->obd_name, rc);
672                 GOTO(out_llog, rc);
673         }
674
675         l_wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING ||
676                                           thread->t_flags & SVC_STOPPED, &lwi);
677
678         RETURN(0);
679 out_llog:
680         lod_sub_fini_llog(env, dt, thread);
681 free_thread:
682         if (lod->lod_child != dt) {
683                 OBD_FREE_PTR(sub_ltd->ltd_recovery_thread);
684                 sub_ltd->ltd_recovery_thread = NULL;
685         }
686 free_lrd:
687         OBD_FREE_PTR(lrd);
688         RETURN(rc);
689 }
690
691 /**
692  * Stop sub recovery thread
693  *
694  * Stop sub recovery thread on all subs.
695  *
696  * \param[in] env       execution environment
697  * \param[in] lod       lod device to do update recovery
698  */
699 static void lod_sub_stop_recovery_threads(const struct lu_env *env,
700                                           struct lod_device *lod)
701 {
702         struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
703         struct ptlrpc_thread *thread;
704         unsigned int i;
705
706         /*
707          * Stop the update log commit cancel threads and finish master
708          * llog ctxt
709          */
710         thread = &lod->lod_child_recovery_thread;
711         /* Stop recovery thread first */
712         if (thread && thread->t_flags & SVC_RUNNING) {
713                 thread->t_flags = SVC_STOPPING;
714                 wake_up(&thread->t_ctl_waitq);
715                 wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
716         }
717
718         lod_getref(ltd);
719         cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
720                 struct lod_tgt_desc     *tgt;
721
722                 tgt = LTD_TGT(ltd, i);
723                 thread = tgt->ltd_recovery_thread;
724                 if (thread && thread->t_flags & SVC_RUNNING) {
725                         thread->t_flags = SVC_STOPPING;
726                         wake_up(&thread->t_ctl_waitq);
727                         wait_event(thread->t_ctl_waitq,
728                                    thread->t_flags & SVC_STOPPED);
729                         OBD_FREE_PTR(tgt->ltd_recovery_thread);
730                         tgt->ltd_recovery_thread = NULL;
731                 }
732         }
733
734         lod_putref(lod, ltd);
735 }
736
737 /**
738  * finish all sub llog
739  *
740  * cleanup all of sub llog ctxt on the LOD.
741  *
742  * \param[in] env       execution environment
743  * \param[in] lod       lod device to do update recovery
744  */
745 static void lod_sub_fini_all_llogs(const struct lu_env *env,
746                                    struct lod_device *lod)
747 {
748         struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
749         unsigned int i;
750
751         /*
752          * Stop the update log commit cancel threads and finish master
753          * llog ctxt
754          */
755         lod_sub_fini_llog(env, lod->lod_child,
756                           &lod->lod_child_recovery_thread);
757         lod_getref(ltd);
758         cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
759                 struct lod_tgt_desc *tgt;
760
761                 tgt = LTD_TGT(ltd, i);
762                 lod_sub_fini_llog(env, tgt->ltd_tgt,
763                                   tgt->ltd_recovery_thread);
764         }
765
766         lod_putref(lod, ltd);
767 }
768
769 static char *lod_show_update_logs_retrievers(void *data, int *size, int *count)
770 {
771         struct lod_device *lod = (struct lod_device *)data;
772         struct lu_target *lut = lod2lu_dev(lod)->ld_site->ls_tgt;
773         struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
774         struct lod_tgt_desc *tgt = NULL;
775         char *buf;
776         int len = 0;
777         int rc;
778         int i;
779
780         *count = atomic_read(&lut->lut_tdtd->tdtd_recovery_threads_count);
781         if (*count == 0) {
782                 *size = 0;
783                 return NULL;
784         }
785
786         *size = 5 * *count + 1;
787         OBD_ALLOC(buf, *size);
788         if (!buf)
789                 return NULL;
790
791         *count = 0;
792         memset(buf, 0, *size);
793
794         if (!lod->lod_child_got_update_log) {
795                 rc = lodname2mdt_index(lod2obd(lod)->obd_name, &i);
796                 LASSERTF(rc == 0, "Fail to parse target index: rc = %d\n", rc);
797
798                 rc = snprintf(buf + len, *size - len, " %04x", i);
799                 LASSERT(rc > 0);
800
801                 len += rc;
802                 (*count)++;
803         }
804
805         cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
806                 tgt = LTD_TGT(ltd, i);
807                 if (!tgt->ltd_got_update_log) {
808                         rc = snprintf(buf + len, *size - len, " %04x", i);
809                         if (unlikely(rc <= 0))
810                                 break;
811
812                         len += rc;
813                         (*count)++;
814                 }
815         }
816
817         return buf;
818 }
819
820 /**
821  * Prepare distribute txn
822  *
823  * Prepare distribute txn structure for LOD
824  *
825  * \param[in] env       execution environment
826  * \param[in] lod_device  LOD device
827  *
828  * \retval              0 if preparation succeeds.
829  * \retval              negative errno if preparation fails.
830  */
831 static int lod_prepare_distribute_txn(const struct lu_env *env,
832                                       struct lod_device *lod)
833 {
834         struct target_distribute_txn_data *tdtd;
835         struct lu_target *lut;
836         int rc;
837
838         ENTRY;
839
840         /* Init update recovery data */
841         OBD_ALLOC_PTR(tdtd);
842         if (!tdtd)
843                 RETURN(-ENOMEM);
844
845         lut = lod2lu_dev(lod)->ld_site->ls_tgt;
846         tdtd->tdtd_dt = &lod->lod_dt_dev;
847         rc = distribute_txn_init(env, lut, tdtd,
848                 lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id);
849
850         if (rc < 0) {
851                 CERROR("%s: cannot init distribute txn: rc = %d\n",
852                        lod2obd(lod)->obd_name, rc);
853                 OBD_FREE_PTR(tdtd);
854                 RETURN(rc);
855         }
856
857         tdtd->tdtd_show_update_logs_retrievers =
858                 lod_show_update_logs_retrievers;
859         tdtd->tdtd_show_retrievers_cbdata = lod;
860
861         lut->lut_tdtd = tdtd;
862
863         RETURN(0);
864 }
865
866 /**
867  * Finish distribute txn
868  *
869  * Release the resource holding by distribute txn, i.e. stop distribute
870  * txn thread.
871  *
872  * \param[in] env       execution environment
873  * \param[in] lod       lod device
874  */
875 static void lod_fini_distribute_txn(const struct lu_env *env,
876                                     struct lod_device *lod)
877 {
878         struct lu_target *lut;
879
880         lut = lod2lu_dev(lod)->ld_site->ls_tgt;
881         target_recovery_fini(lut->lut_obd);
882         if (!lut->lut_tdtd)
883                 return;
884
885         distribute_txn_fini(env, lut->lut_tdtd);
886
887         OBD_FREE_PTR(lut->lut_tdtd);
888         lut->lut_tdtd = NULL;
889 }
890
891 /**
892  * Implementation of lu_device_operations::ldo_process_config() for LOD
893  *
894  * The method is called by the configuration subsystem during setup,
895  * cleanup and when the configuration changes. The method processes
896  * few specific commands like adding/removing the targets, changing
897  * the runtime parameters.
898
899  * \param[in] env               LU environment provided by the caller
900  * \param[in] dev               lod device
901  * \param[in] lcfg              configuration command to apply
902  *
903  * \retval 0                    on success
904  * \retval negative             negated errno on error
905  *
906  * The examples are below.
907  *
908  * Add osc config log:
909  * marker  20 (flags=0x01, v2.2.49.56) lustre-OST0001  'add osc'
910  * add_uuid  nid=192.168.122.162@tcp(0x20000c0a87aa2)  0:  1:nidxxx
911  * attach    0:lustre-OST0001-osc-MDT0001  1:osc  2:lustre-MDT0001-mdtlov_UUID
912  * setup     0:lustre-OST0001-osc-MDT0001  1:lustre-OST0001_UUID  2:nid
913  * lov_modify_tgts add 0:lustre-MDT0001-mdtlov  1:lustre-OST0001_UUID  2:1  3:1
914  * marker  20 (flags=0x02, v2.2.49.56) lustre-OST0001  'add osc'
915  *
916  * Add mdc config log:
917  * marker  10 (flags=0x01, v2.2.49.56) lustre-MDT0000  'add osp'
918  * add_uuid  nid=192.168.122.162@tcp(0x20000c0a87aa2)  0:  1:nid
919  * attach 0:lustre-MDT0000-osp-MDT0001  1:osp  2:lustre-MDT0001-mdtlov_UUID
920  * setup     0:lustre-MDT0000-osp-MDT0001  1:lustre-MDT0000_UUID  2:nid
921  * modify_mdc_tgts add 0:lustre-MDT0001  1:lustre-MDT0000_UUID  2:0  3:1
922  * marker  10 (flags=0x02, v2.2.49.56) lustre-MDT0000_UUID  'add osp'
923  */
924 static int lod_process_config(const struct lu_env *env,
925                               struct lu_device *dev,
926                               struct lustre_cfg *lcfg)
927 {
928         struct lod_device *lod = lu2lod_dev(dev);
929         struct lu_device *next = &lod->lod_child->dd_lu_dev;
930         char *arg1;
931         int rc = 0;
932
933         ENTRY;
934
935         switch (lcfg->lcfg_command) {
936         case LCFG_LOV_DEL_OBD:
937         case LCFG_LOV_ADD_INA:
938         case LCFG_LOV_ADD_OBD:
939         case LCFG_ADD_MDC: {
940                 u32 index;
941                 u32 mdt_index;
942                 int gen;
943                 /*
944                  * lov_modify_tgts add  0:lov_mdsA  1:osp  2:0  3:1
945                  * modify_mdc_tgts add  0:lustre-MDT0001
946                  *                    1:lustre-MDT0001-mdc0002
947                  *                    2:2  3:1
948                  */
949                 arg1 = lustre_cfg_string(lcfg, 1);
950
951                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
952                         GOTO(out, rc = -EINVAL);
953                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
954                         GOTO(out, rc = -EINVAL);
955
956                 if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
957                         u32 mdt_index;
958
959                         rc = lodname2mdt_index(lustre_cfg_string(lcfg, 0),
960                                                &mdt_index);
961                         if (rc != 0)
962                                 GOTO(out, rc);
963
964                         rc = lod_add_device(env, lod, arg1, index, gen,
965                                             mdt_index, LUSTRE_OSC_NAME, 1);
966                 } else if (lcfg->lcfg_command == LCFG_ADD_MDC) {
967                         mdt_index = index;
968                         rc = lod_add_device(env, lod, arg1, index, gen,
969                                             mdt_index, LUSTRE_MDC_NAME, 1);
970                 } else if (lcfg->lcfg_command == LCFG_LOV_ADD_INA) {
971                         /*FIXME: Add mdt_index for LCFG_LOV_ADD_INA*/
972                         mdt_index = 0;
973                         rc = lod_add_device(env, lod, arg1, index, gen,
974                                             mdt_index, LUSTRE_OSC_NAME, 0);
975                 } else {
976                         rc = lod_del_device(env, lod,
977                                             &lod->lod_ost_descs,
978                                             arg1, index, gen, true);
979                 }
980
981                 break;
982         }
983
984         case LCFG_PARAM: {
985                 struct obd_device *obd;
986                 ssize_t count;
987                 char *param;
988
989                 /*
990                  * Check if it is activate/deactivate mdc
991                  * lustre-MDTXXXX-osp-MDTXXXX.active=1
992                  */
993                 param = lustre_cfg_buf(lcfg, 1);
994                 if (strstr(param, "osp") && strstr(param, ".active=")) {
995                         struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
996                         struct lod_tgt_desc *sub_tgt = NULL;
997                         char *ptr;
998                         char *tmp;
999                         int i;
1000
1001                         ptr = strstr(param, ".");
1002                         *ptr = '\0';
1003                         obd = class_name2obd(param);
1004                         if (!obd) {
1005                                 CERROR("%s: can not find %s: rc = %d\n",
1006                                        lod2obd(lod)->obd_name, param, -EINVAL);
1007                                 *ptr = '.';
1008                                 GOTO(out, rc);
1009                         }
1010
1011                         cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
1012                                 struct lod_tgt_desc *tgt;
1013
1014                                 tgt = LTD_TGT(ltd, i);
1015                                 if (tgt->ltd_tgt->dd_lu_dev.ld_obd == obd) {
1016                                         sub_tgt = tgt;
1017                                         break;
1018                                 }
1019                         }
1020
1021                         if (!sub_tgt) {
1022                                 CERROR("%s: can not find %s: rc = %d\n",
1023                                        lod2obd(lod)->obd_name, param, -EINVAL);
1024                                 *ptr = '.';
1025                                 GOTO(out, rc);
1026                         }
1027
1028                         *ptr = '.';
1029                         tmp = strstr(param, "=");
1030                         tmp++;
1031                         if (*tmp == '1') {
1032                                 struct llog_ctxt *ctxt;
1033
1034                                 obd = sub_tgt->ltd_tgt->dd_lu_dev.ld_obd;
1035                                 ctxt = llog_get_context(obd,
1036                                                 LLOG_UPDATELOG_ORIG_CTXT);
1037                                 if (!ctxt) {
1038                                         rc = llog_setup(env, obd, &obd->obd_olg,
1039                                                        LLOG_UPDATELOG_ORIG_CTXT,
1040                                                     NULL, &llog_common_cat_ops);
1041                                         if (rc < 0)
1042                                                 GOTO(out, rc);
1043                                 } else {
1044                                         llog_ctxt_put(ctxt);
1045                                 }
1046                                 rc = lod_sub_prep_llog(env, lod,
1047                                                        sub_tgt->ltd_tgt,
1048                                                        sub_tgt->ltd_index);
1049                                 if (rc == 0)
1050                                         sub_tgt->ltd_active = 1;
1051                         } else {
1052                                 lod_sub_fini_llog(env, sub_tgt->ltd_tgt,
1053                                                   NULL);
1054                                 sub_tgt->ltd_active = 0;
1055                         }
1056                         GOTO(out, rc);
1057                 }
1058
1059
1060                 if (strstr(param, PARAM_LOD) != NULL)
1061                         count = class_modify_config(lcfg, PARAM_LOD,
1062                                                     &lod->lod_dt_dev.dd_kobj);
1063                 else
1064                         count = class_modify_config(lcfg, PARAM_LOV,
1065                                                     &lod->lod_dt_dev.dd_kobj);
1066                 rc = count > 0 ? 0 : count;
1067                 GOTO(out, rc);
1068         }
1069         case LCFG_PRE_CLEANUP: {
1070                 lod_sub_process_config(env, lod, &lod->lod_mdt_descs, lcfg);
1071                 lod_sub_process_config(env, lod, &lod->lod_ost_descs, lcfg);
1072                 next = &lod->lod_child->dd_lu_dev;
1073                 rc = next->ld_ops->ldo_process_config(env, next, lcfg);
1074                 if (rc != 0)
1075                         CDEBUG(D_HA, "%s: can't process %u: %d\n",
1076                                lod2obd(lod)->obd_name, lcfg->lcfg_command, rc);
1077
1078                 lod_sub_stop_recovery_threads(env, lod);
1079                 lod_fini_distribute_txn(env, lod);
1080                 lod_sub_fini_all_llogs(env, lod);
1081                 break;
1082         }
1083         case LCFG_CLEANUP: {
1084                 if (lod->lod_md_root) {
1085                         dt_object_put(env, &lod->lod_md_root->ldo_obj);
1086                         lod->lod_md_root = NULL;
1087                 }
1088
1089                 /*
1090                  * do cleanup on underlying storage only when
1091                  * all OSPs are cleaned up, as they use that OSD as well
1092                  */
1093                 lu_dev_del_linkage(dev->ld_site, dev);
1094                 lod_sub_process_config(env, lod, &lod->lod_mdt_descs, lcfg);
1095                 lod_sub_process_config(env, lod, &lod->lod_ost_descs, lcfg);
1096                 next = &lod->lod_child->dd_lu_dev;
1097                 rc = next->ld_ops->ldo_process_config(env, next, lcfg);
1098                 if (rc)
1099                         CERROR("%s: can't process %u: rc = %d\n",
1100                                lod2obd(lod)->obd_name, lcfg->lcfg_command, rc);
1101
1102                 rc = obd_disconnect(lod->lod_child_exp);
1103                 if (rc)
1104                         CERROR("error in disconnect from storage: rc = %d\n",
1105                                rc);
1106                 break;
1107         }
1108         default:
1109                 CERROR("%s: unknown command %u\n", lod2obd(lod)->obd_name,
1110                        lcfg->lcfg_command);
1111                 rc = -EINVAL;
1112                 break;
1113         }
1114
1115 out:
1116         RETURN(rc);
1117 }
1118
1119 /**
1120  * Implementation of lu_device_operations::ldo_recovery_complete() for LOD
1121  *
1122  * The method is called once the recovery is complete. This implementation
1123  * distributes the notification to all the known targets.
1124  *
1125  * see include/lu_object.h for the details
1126  */
1127 static int lod_recovery_complete(const struct lu_env *env,
1128                                  struct lu_device *dev)
1129 {
1130         struct lod_device *lod = lu2lod_dev(dev);
1131         struct lu_device *next = &lod->lod_child->dd_lu_dev;
1132         unsigned int i;
1133         int rc;
1134
1135         ENTRY;
1136
1137         LASSERT(lod->lod_recovery_completed == 0);
1138         lod->lod_recovery_completed = 1;
1139
1140         rc = next->ld_ops->ldo_recovery_complete(env, next);
1141
1142         lod_getref(&lod->lod_ost_descs);
1143         if (lod->lod_osts_size > 0) {
1144                 cfs_foreach_bit(lod->lod_ost_bitmap, i) {
1145                         struct lod_tgt_desc *tgt;
1146
1147                         tgt = OST_TGT(lod, i);
1148                         LASSERT(tgt && tgt->ltd_tgt);
1149                         next = &tgt->ltd_ost->dd_lu_dev;
1150                         rc = next->ld_ops->ldo_recovery_complete(env, next);
1151                         if (rc)
1152                                 CERROR("%s: can't complete recovery on #%d: rc = %d\n",
1153                                        lod2obd(lod)->obd_name, i, rc);
1154                 }
1155         }
1156         lod_putref(lod, &lod->lod_ost_descs);
1157         RETURN(rc);
1158 }
1159
1160 /**
1161  * Init update logs on all sub device
1162  *
1163  * LOD initialize update logs on all of sub devices. Because the initialization
1164  * process might need FLD lookup, see llog_osd_open()->dt_locate()->...->
1165  * lod_object_init(), this API has to be called after LOD is initialized.
1166  * \param[in] env       execution environment
1167  * \param[in] lod       lod device
1168  *
1169  * \retval              0 if update log is initialized successfully.
1170  * \retval              negative errno if initialization fails.
1171  */
1172 static int lod_sub_init_llogs(const struct lu_env *env, struct lod_device *lod)
1173 {
1174         struct lod_tgt_descs *ltd = &lod->lod_mdt_descs;
1175         int rc;
1176         unsigned int i;
1177
1178         ENTRY;
1179
1180         /*
1181          * llog must be setup after LOD is initialized, because llog
1182          * initialization include FLD lookup
1183          */
1184         LASSERT(lod->lod_initialized);
1185
1186         /* Init the llog in its own stack */
1187         rc = lod_sub_init_llog(env, lod, lod->lod_child);
1188         if (rc < 0)
1189                 RETURN(rc);
1190
1191         cfs_foreach_bit(ltd->ltd_tgt_bitmap, i) {
1192                 struct lod_tgt_desc *tgt;
1193
1194                 tgt = LTD_TGT(ltd, i);
1195                 rc = lod_sub_init_llog(env, lod, tgt->ltd_tgt);
1196                 if (rc != 0)
1197                         break;
1198         }
1199
1200         RETURN(rc);
1201 }
1202
1203 /**
1204  * Implementation of lu_device_operations::ldo_prepare() for LOD
1205  *
1206  * see include/lu_object.h for the details.
1207  */
1208 static int lod_prepare(const struct lu_env *env, struct lu_device *pdev,
1209                        struct lu_device *cdev)
1210 {
1211         struct lod_device *lod = lu2lod_dev(cdev);
1212         struct lu_device *next = &lod->lod_child->dd_lu_dev;
1213         struct lu_fid *fid = &lod_env_info(env)->lti_fid;
1214         int rc;
1215         struct dt_object *root;
1216         struct dt_object *dto;
1217         u32 index;
1218
1219         ENTRY;
1220
1221         rc = next->ld_ops->ldo_prepare(env, pdev, next);
1222         if (rc != 0) {
1223                 CERROR("%s: prepare bottom error: rc = %d\n",
1224                        lod2obd(lod)->obd_name, rc);
1225                 RETURN(rc);
1226         }
1227
1228         lod->lod_initialized = 1;
1229
1230         rc = dt_root_get(env, lod->lod_child, fid);
1231         if (rc < 0)
1232                 RETURN(rc);
1233
1234         root = dt_locate(env, lod->lod_child, fid);
1235         if (IS_ERR(root))
1236                 RETURN(PTR_ERR(root));
1237
1238         /* Create update log object */
1239         index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
1240         lu_update_log_fid(fid, index);
1241
1242         dto = local_file_find_or_create_with_fid(env, lod->lod_child,
1243                                                  fid, root,
1244                                                  lod_update_log_name,
1245                                                  S_IFREG | 0644);
1246         if (IS_ERR(dto))
1247                 GOTO(out_put, rc = PTR_ERR(dto));
1248
1249         dt_object_put(env, dto);
1250
1251         /* Create update log dir */
1252         lu_update_log_dir_fid(fid, index);
1253         dto = local_file_find_or_create_with_fid(env, lod->lod_child,
1254                                                  fid, root,
1255                                                  lod_update_log_dir_name,
1256                                                  S_IFDIR | 0644);
1257         if (IS_ERR(dto))
1258                 GOTO(out_put, rc = PTR_ERR(dto));
1259
1260         dt_object_put(env, dto);
1261
1262         rc = lod_prepare_distribute_txn(env, lod);
1263         if (rc != 0)
1264                 GOTO(out_put, rc);
1265
1266         rc = lod_sub_init_llogs(env, lod);
1267         if (rc != 0)
1268                 GOTO(out_put, rc);
1269
1270 out_put:
1271         dt_object_put(env, root);
1272
1273         RETURN(rc);
1274 }
1275
1276 const struct lu_device_operations lod_lu_ops = {
1277         .ldo_object_alloc       = lod_object_alloc,
1278         .ldo_process_config     = lod_process_config,
1279         .ldo_recovery_complete  = lod_recovery_complete,
1280         .ldo_prepare            = lod_prepare,
1281 };
1282
1283 /**
1284  * Implementation of dt_device_operations::dt_root_get() for LOD
1285  *
1286  * see include/dt_object.h for the details.
1287  */
1288 static int lod_root_get(const struct lu_env *env,
1289                         struct dt_device *dev, struct lu_fid *f)
1290 {
1291         return dt_root_get(env, dt2lod_dev(dev)->lod_child, f);
1292 }
1293
1294 static void lod_statfs_sum(struct obd_statfs *sfs,
1295                              struct obd_statfs *ost_sfs, int *bs)
1296 {
1297         while (ost_sfs->os_bsize < *bs) {
1298                 *bs >>= 1;
1299                 sfs->os_bsize >>= 1;
1300                 sfs->os_bavail <<= 1;
1301                 sfs->os_blocks <<= 1;
1302                 sfs->os_bfree <<= 1;
1303                 sfs->os_granted <<= 1;
1304         }
1305         while (ost_sfs->os_bsize > *bs) {
1306                 ost_sfs->os_bsize >>= 1;
1307                 ost_sfs->os_bavail <<= 1;
1308                 ost_sfs->os_blocks <<= 1;
1309                 ost_sfs->os_bfree <<= 1;
1310                 ost_sfs->os_granted <<= 1;
1311         }
1312         sfs->os_bavail += ost_sfs->os_bavail;
1313         sfs->os_blocks += ost_sfs->os_blocks;
1314         sfs->os_bfree += ost_sfs->os_bfree;
1315         sfs->os_granted += ost_sfs->os_granted;
1316 }
1317
1318 /**
1319  * Implementation of dt_device_operations::dt_statfs() for LOD
1320  *
1321  * see include/dt_object.h for the details.
1322  */
1323 static int lod_statfs(const struct lu_env *env,
1324                       struct dt_device *dev, struct obd_statfs *sfs)
1325 {
1326         struct lod_device *lod = dt2lod_dev(dev);
1327         struct lod_ost_desc *ost;
1328         struct lod_mdt_desc *mdt;
1329         struct obd_statfs ost_sfs;
1330         u64 ost_files = 0;
1331         u64 ost_ffree = 0;
1332         int i, rc, bs;
1333
1334         rc = dt_statfs(env, dt2lod_dev(dev)->lod_child, sfs);
1335         if (rc)
1336                 GOTO(out, rc);
1337
1338         bs = sfs->os_bsize;
1339
1340         sfs->os_bavail = 0;
1341         sfs->os_blocks = 0;
1342         sfs->os_bfree = 0;
1343         sfs->os_granted = 0;
1344
1345         lod_getref(&lod->lod_mdt_descs);
1346         lod_foreach_mdt(lod, i) {
1347                 mdt = MDT_TGT(lod, i);
1348                 LASSERT(mdt && mdt->ltd_mdt);
1349                 rc = dt_statfs(env, mdt->ltd_mdt, &ost_sfs);
1350                 /* ignore errors */
1351                 if (rc)
1352                         continue;
1353                 sfs->os_files += ost_sfs.os_files;
1354                 sfs->os_ffree += ost_sfs.os_ffree;
1355                 lod_statfs_sum(sfs, &ost_sfs, &bs);
1356         }
1357         lod_putref(lod, &lod->lod_mdt_descs);
1358
1359         /*
1360          * at some point we can check whether DoM is enabled and
1361          * decide how to account MDT space. for simplicity let's
1362          * just fallback to pre-DoM policy if any OST is alive
1363          */
1364         lod_getref(&lod->lod_ost_descs);
1365         lod_foreach_ost(lod, i) {
1366                 ost = OST_TGT(lod, i);
1367                 LASSERT(ost && ost->ltd_ost);
1368                 rc = dt_statfs(env, ost->ltd_ost, &ost_sfs);
1369                 /* ignore errors */
1370                 if (rc || ost_sfs.os_bsize == 0)
1371                         continue;
1372                 if (!ost_files) {
1373                         /*
1374                          * if only MDTs with DoM then report only MDT blocks,
1375                          * otherwise show only OST blocks, and DoM is "free"
1376                          */
1377                         sfs->os_bavail = 0;
1378                         sfs->os_blocks = 0;
1379                         sfs->os_bfree = 0;
1380                         sfs->os_granted = 0;
1381                 }
1382                 ost_files += ost_sfs.os_files;
1383                 ost_ffree += ost_sfs.os_ffree;
1384                 ost_sfs.os_bavail += ost_sfs.os_granted;
1385                 lod_statfs_sum(sfs, &ost_sfs, &bs);
1386                 LASSERTF(bs == ost_sfs.os_bsize, "%d != %d\n",
1387                         (int)sfs->os_bsize, (int)ost_sfs.os_bsize);
1388         }
1389         lod_putref(lod, &lod->lod_ost_descs);
1390         sfs->os_state |= OS_STATE_SUM;
1391
1392         /* If we have _some_ OSTs, but don't have as many free objects on the
1393          * OSTs as inodes on the MDTs, reduce the reported number of inodes
1394          * to compensate, so that the "inodes in use" number is correct.
1395          * This should be kept in sync with ll_statfs_internal().
1396          */
1397         if (ost_files && ost_ffree < sfs->os_ffree) {
1398                 sfs->os_files = (sfs->os_files - sfs->os_ffree) + ost_ffree;
1399                 sfs->os_ffree = ost_ffree;
1400         }
1401
1402         /* a single successful statfs should be enough */
1403         rc = 0;
1404
1405 out:
1406         RETURN(rc);
1407 }
1408
1409 /**
1410  * Implementation of dt_device_operations::dt_trans_create() for LOD
1411  *
1412  * Creates a transaction using local (to this node) OSD.
1413  *
1414  * see include/dt_object.h for the details.
1415  */
1416 static struct thandle *lod_trans_create(const struct lu_env *env,
1417                                         struct dt_device *dt)
1418 {
1419         struct thandle *th;
1420
1421         th = top_trans_create(env, dt2lod_dev(dt)->lod_child);
1422         if (IS_ERR(th))
1423                 return th;
1424
1425         th->th_dev = dt;
1426
1427         return th;
1428 }
1429
1430 /**
1431  * Implementation of dt_device_operations::dt_trans_start() for LOD
1432  *
1433  * Starts the set of local transactions using the targets involved
1434  * in declare phase. Initial support for the distributed transactions.
1435  *
1436  * see include/dt_object.h for the details.
1437  */
1438 static int lod_trans_start(const struct lu_env *env, struct dt_device *dt,
1439                            struct thandle *th)
1440 {
1441         return top_trans_start(env, dt2lod_dev(dt)->lod_child, th);
1442 }
1443
1444 static int lod_trans_cb_add(struct thandle *th,
1445                             struct dt_txn_commit_cb *dcb)
1446 {
1447         struct top_thandle      *top_th = container_of(th, struct top_thandle,
1448                                                        tt_super);
1449         return dt_trans_cb_add(top_th->tt_master_sub_thandle, dcb);
1450 }
1451
1452 /**
1453  * add noop update to the update records
1454  *
1455  * Add noop updates to the update records, which is only used in
1456  * test right now.
1457  *
1458  * \param[in] env       execution environment
1459  * \param[in] dt        dt device of lod
1460  * \param[in] th        thandle
1461  * \param[in] count     the count of update records to be added.
1462  *
1463  * \retval              0 if adding succeeds.
1464  * \retval              negative errno if adding fails.
1465  */
1466 static int lod_add_noop_records(const struct lu_env *env,
1467                                 struct dt_device *dt, struct thandle *th,
1468                                 int count)
1469 {
1470         struct top_thandle *top_th;
1471         struct lu_fid *fid = &lod_env_info(env)->lti_fid;
1472         int i;
1473         int rc = 0;
1474
1475         top_th = container_of(th, struct top_thandle, tt_super);
1476         if (!top_th->tt_multiple_thandle)
1477                 return 0;
1478
1479         fid_zero(fid);
1480         for (i = 0; i < count; i++) {
1481                 rc = update_record_pack(noop, th, fid);
1482                 if (rc < 0)
1483                         return rc;
1484         }
1485         return rc;
1486 }
1487
1488 /**
1489  * Implementation of dt_device_operations::dt_trans_stop() for LOD
1490  *
1491  * Stops the set of local transactions using the targets involved
1492  * in declare phase. Initial support for the distributed transactions.
1493  *
1494  * see include/dt_object.h for the details.
1495  */
1496 static int lod_trans_stop(const struct lu_env *env, struct dt_device *dt,
1497                           struct thandle *th)
1498 {
1499         if (OBD_FAIL_CHECK(OBD_FAIL_SPLIT_UPDATE_REC)) {
1500                 int rc;
1501
1502                 rc = lod_add_noop_records(env, dt, th, 5000);
1503                 if (rc < 0)
1504                         RETURN(rc);
1505         }
1506         return top_trans_stop(env, dt2lod_dev(dt)->lod_child, th);
1507 }
1508
1509 /**
1510  * Implementation of dt_device_operations::dt_conf_get() for LOD
1511  *
1512  * Currently returns the configuration provided by the local OSD.
1513  *
1514  * see include/dt_object.h for the details.
1515  */
1516 static void lod_conf_get(const struct lu_env *env,
1517                          const struct dt_device *dev,
1518                          struct dt_device_param *param)
1519 {
1520         dt_conf_get(env, dt2lod_dev((struct dt_device *)dev)->lod_child, param);
1521 }
1522
1523 /**
1524  * Implementation of dt_device_operations::dt_sync() for LOD
1525  *
1526  * Syncs all known OST targets. Very very expensive and used
1527  * rarely by LFSCK now. Should not be used in general.
1528  *
1529  * see include/dt_object.h for the details.
1530  */
1531 static int lod_sync(const struct lu_env *env, struct dt_device *dev)
1532 {
1533         struct lod_device *lod = dt2lod_dev(dev);
1534         struct lod_ost_desc *ost;
1535         struct lod_mdt_desc *mdt;
1536         unsigned int i;
1537         int rc = 0;
1538
1539         ENTRY;
1540
1541         lod_getref(&lod->lod_ost_descs);
1542         lod_foreach_ost(lod, i) {
1543                 ost = OST_TGT(lod, i);
1544                 LASSERT(ost && ost->ltd_ost);
1545                 if (!ost->ltd_active)
1546                         continue;
1547                 rc = dt_sync(env, ost->ltd_ost);
1548                 if (rc) {
1549                         if (rc != -ENOTCONN) {
1550                                 CERROR("%s: can't sync ost %u: rc = %d\n",
1551                                        lod2obd(lod)->obd_name, i, rc);
1552                                 break;
1553                         }
1554                         rc = 0;
1555                 }
1556         }
1557         lod_putref(lod, &lod->lod_ost_descs);
1558
1559         if (rc)
1560                 RETURN(rc);
1561
1562         lod_getref(&lod->lod_mdt_descs);
1563         lod_foreach_mdt(lod, i) {
1564                 mdt = MDT_TGT(lod, i);
1565                 LASSERT(mdt && mdt->ltd_mdt);
1566                 if (!mdt->ltd_active)
1567                         continue;
1568                 rc = dt_sync(env, mdt->ltd_mdt);
1569                 if (rc) {
1570                         if (rc != -ENOTCONN) {
1571                                 CERROR("%s: can't sync mdt %u: rc = %d\n",
1572                                        lod2obd(lod)->obd_name, i, rc);
1573                                 break;
1574                         }
1575                         rc = 0;
1576                 }
1577         }
1578         lod_putref(lod, &lod->lod_mdt_descs);
1579
1580         if (rc == 0)
1581                 rc = dt_sync(env, lod->lod_child);
1582
1583         RETURN(rc);
1584 }
1585
1586 /**
1587  * Implementation of dt_device_operations::dt_ro() for LOD
1588  *
1589  * Turns local OSD read-only, used for the testing only.
1590  *
1591  * see include/dt_object.h for the details.
1592  */
1593 static int lod_ro(const struct lu_env *env, struct dt_device *dev)
1594 {
1595         return dt_ro(env, dt2lod_dev(dev)->lod_child);
1596 }
1597
1598 /**
1599  * Implementation of dt_device_operations::dt_commit_async() for LOD
1600  *
1601  * Asks local OSD to commit sooner.
1602  *
1603  * see include/dt_object.h for the details.
1604  */
1605 static int lod_commit_async(const struct lu_env *env, struct dt_device *dev)
1606 {
1607         return dt_commit_async(env, dt2lod_dev(dev)->lod_child);
1608 }
1609
1610 static const struct dt_device_operations lod_dt_ops = {
1611         .dt_root_get         = lod_root_get,
1612         .dt_statfs           = lod_statfs,
1613         .dt_trans_create     = lod_trans_create,
1614         .dt_trans_start      = lod_trans_start,
1615         .dt_trans_stop       = lod_trans_stop,
1616         .dt_conf_get         = lod_conf_get,
1617         .dt_sync             = lod_sync,
1618         .dt_ro               = lod_ro,
1619         .dt_commit_async     = lod_commit_async,
1620         .dt_trans_cb_add     = lod_trans_cb_add,
1621 };
1622
1623 /**
1624  * Connect to a local OSD.
1625  *
1626  * Used to connect to the local OSD at mount. OSD name is taken from the
1627  * configuration command passed. This connection is used to identify LU
1628  * site and pin the OSD from early removal.
1629  *
1630  * \param[in] env               LU environment provided by the caller
1631  * \param[in] lod               lod device
1632  * \param[in] cfg               configuration command to apply
1633  *
1634  * \retval 0                    on success
1635  * \retval negative             negated errno on error
1636  **/
1637 static int lod_connect_to_osd(const struct lu_env *env, struct lod_device *lod,
1638                               struct lustre_cfg *cfg)
1639 {
1640         struct obd_connect_data *data = NULL;
1641         struct obd_device *obd;
1642         char *nextdev = NULL, *p, *s;
1643         int rc, len = 0;
1644
1645         ENTRY;
1646
1647         LASSERT(cfg);
1648         LASSERT(lod->lod_child_exp == NULL);
1649
1650         /*
1651          * compatibility hack: we still use old config logs
1652          * which specify LOV, but we need to learn underlying
1653          * OSD device, which is supposed to be:
1654          *  <fsname>-MDTxxxx-osd
1655          *
1656          * 2.x MGS generates lines like the following:
1657          *   #03 (176)lov_setup 0:lustre-MDT0000-mdtlov  1:(struct lov_desc)
1658          * 1.8 MGS generates lines like the following:
1659          *   #03 (168)lov_setup 0:lustre-mdtlov  1:(struct lov_desc)
1660          *
1661          * we use "-MDT" to differentiate 2.x from 1.8
1662          */
1663         p = lustre_cfg_string(cfg, 0);
1664         if (p && strstr(p, "-mdtlov")) {
1665                 len = strlen(p) + 6;
1666                 OBD_ALLOC(nextdev, len);
1667                 if (!nextdev)
1668                         GOTO(out, rc = -ENOMEM);
1669
1670                 strcpy(nextdev, p);
1671                 s = strstr(nextdev, "-mdtlov");
1672                 if (unlikely(!s)) {
1673                         CERROR("%s: unable to parse device name: rc = %d\n",
1674                                lustre_cfg_string(cfg, 0), -EINVAL);
1675                         GOTO(out, rc = -EINVAL);
1676                 }
1677
1678                 if (strstr(nextdev, "-MDT")) {
1679                         /* 2.x config */
1680                         strcpy(s, "-osd");
1681                 } else {
1682                         /* 1.8 config */
1683                         strcpy(s, "-MDT0000-osd");
1684                 }
1685         } else {
1686                 CERROR("%s: unable to parse device name: rc = %d\n",
1687                        lustre_cfg_string(cfg, 0), -EINVAL);
1688                 GOTO(out, rc = -EINVAL);
1689         }
1690
1691         OBD_ALLOC_PTR(data);
1692         if (!data)
1693                 GOTO(out, rc = -ENOMEM);
1694
1695         obd = class_name2obd(nextdev);
1696         if (!obd) {
1697                 CERROR("%s: can not locate next device: rc = %d\n",
1698                        nextdev, -ENOTCONN);
1699                 GOTO(out, rc = -ENOTCONN);
1700         }
1701
1702         data->ocd_connect_flags = OBD_CONNECT_VERSION;
1703         data->ocd_version = LUSTRE_VERSION_CODE;
1704
1705         rc = obd_connect(env, &lod->lod_child_exp, obd, &obd->obd_uuid,
1706                          data, NULL);
1707         if (rc) {
1708                 CERROR("%s: cannot connect to next dev: rc = %d\n",
1709                        nextdev, rc);
1710                 GOTO(out, rc);
1711         }
1712
1713         lod->lod_dt_dev.dd_lu_dev.ld_site =
1714                 lod->lod_child_exp->exp_obd->obd_lu_dev->ld_site;
1715         LASSERT(lod->lod_dt_dev.dd_lu_dev.ld_site);
1716         lod->lod_child = lu2dt_dev(lod->lod_child_exp->exp_obd->obd_lu_dev);
1717
1718 out:
1719         if (data)
1720                 OBD_FREE_PTR(data);
1721         if (nextdev)
1722                 OBD_FREE(nextdev, len);
1723         RETURN(rc);
1724 }
1725
1726 /**
1727  * Allocate and initialize target table.
1728  *
1729  * A helper function to initialize the target table and allocate
1730  * a bitmap of the available targets.
1731  *
1732  * \param[in] ltd               target's table to initialize
1733  *
1734  * \retval 0                    on success
1735  * \retval negative             negated errno on error
1736  **/
1737 static int lod_tgt_desc_init(struct lod_tgt_descs *ltd)
1738 {
1739         mutex_init(&ltd->ltd_mutex);
1740         init_rwsem(&ltd->ltd_rw_sem);
1741
1742         /*
1743          * the OST array and bitmap are allocated/grown dynamically as OSTs are
1744          * added to the LOD, see lod_add_device()
1745          */
1746         ltd->ltd_tgt_bitmap = CFS_ALLOCATE_BITMAP(32);
1747         if (!ltd->ltd_tgt_bitmap)
1748                 RETURN(-ENOMEM);
1749
1750         ltd->ltd_tgts_size  = 32;
1751         ltd->ltd_tgtnr      = 0;
1752
1753         ltd->ltd_death_row = 0;
1754         ltd->ltd_refcount  = 0;
1755         return 0;
1756 }
1757
1758 /**
1759  * Initialize LOD device at setup.
1760  *
1761  * Initializes the given LOD device using the original configuration command.
1762  * The function initiates a connection to the local OSD and initializes few
1763  * internal structures like pools, target tables, etc.
1764  *
1765  * \param[in] env               LU environment provided by the caller
1766  * \param[in] lod               lod device
1767  * \param[in] ldt               not used
1768  * \param[in] cfg               configuration command
1769  *
1770  * \retval 0                    on success
1771  * \retval negative             negated errno on error
1772  **/
1773 static int lod_init0(const struct lu_env *env, struct lod_device *lod,
1774                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
1775 {
1776         struct dt_device_param ddp;
1777         struct obd_device *obd;
1778         int rc;
1779
1780         ENTRY;
1781
1782         obd = class_name2obd(lustre_cfg_string(cfg, 0));
1783         if (!obd) {
1784                 rc = -ENODEV;
1785                 CERROR("Cannot find obd with name '%s': rc = %d\n",
1786                        lustre_cfg_string(cfg, 0), rc);
1787                 RETURN(rc);
1788         }
1789
1790         obd->obd_lu_dev = &lod->lod_dt_dev.dd_lu_dev;
1791         lod->lod_dt_dev.dd_lu_dev.ld_obd = obd;
1792         lod->lod_dt_dev.dd_lu_dev.ld_ops = &lod_lu_ops;
1793         lod->lod_dt_dev.dd_ops = &lod_dt_ops;
1794
1795         rc = lod_connect_to_osd(env, lod, cfg);
1796         if (rc)
1797                 RETURN(rc);
1798
1799         dt_conf_get(env, &lod->lod_dt_dev, &ddp);
1800         lod->lod_osd_max_easize = ddp.ddp_max_ea_size;
1801         lod->lod_dom_max_stripesize = (1ULL << 20); /* 1Mb as default value */
1802
1803         /* setup obd to be used with old lov code */
1804         rc = lod_pools_init(lod, cfg);
1805         if (rc)
1806                 GOTO(out_disconnect, rc);
1807
1808         rc = lod_procfs_init(lod);
1809         if (rc)
1810                 GOTO(out_pools, rc);
1811
1812         spin_lock_init(&lod->lod_lock);
1813         spin_lock_init(&lod->lod_connects_lock);
1814         lod_tgt_desc_init(&lod->lod_mdt_descs);
1815         lod_tgt_desc_init(&lod->lod_ost_descs);
1816
1817         RETURN(0);
1818
1819 out_pools:
1820         lod_pools_fini(lod);
1821 out_disconnect:
1822         obd_disconnect(lod->lod_child_exp);
1823         RETURN(rc);
1824 }
1825
1826 /**
1827  * Implementation of lu_device_type_operations::ldto_device_free() for LOD
1828  *
1829  * Releases the memory allocated for LOD device.
1830  *
1831  * see include/lu_object.h for the details.
1832  */
1833 static struct lu_device *lod_device_free(const struct lu_env *env,
1834                                          struct lu_device *lu)
1835 {
1836         struct lod_device *lod = lu2lod_dev(lu);
1837         struct lu_device  *next = &lod->lod_child->dd_lu_dev;
1838
1839         ENTRY;
1840
1841         if (atomic_read(&lu->ld_ref) > 0 &&
1842             !cfs_hash_is_empty(lu->ld_site->ls_obj_hash)) {
1843                 LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
1844                 lu_site_print(env, lu->ld_site, &msgdata, lu_cdebug_printer);
1845         }
1846         LASSERTF(atomic_read(&lu->ld_ref) == 0, "lu is %p\n", lu);
1847         dt_device_fini(&lod->lod_dt_dev);
1848         OBD_FREE_PTR(lod);
1849         RETURN(next);
1850 }
1851
1852 /**
1853  * Implementation of lu_device_type_operations::ldto_device_alloc() for LOD
1854  *
1855  * Allocates LOD device and calls the helpers to initialize it.
1856  *
1857  * see include/lu_object.h for the details.
1858  */
1859 static struct lu_device *lod_device_alloc(const struct lu_env *env,
1860                                           struct lu_device_type *type,
1861                                           struct lustre_cfg *lcfg)
1862 {
1863         struct lod_device *lod;
1864         struct lu_device *lu_dev;
1865
1866         OBD_ALLOC_PTR(lod);
1867         if (!lod) {
1868                 lu_dev = ERR_PTR(-ENOMEM);
1869         } else {
1870                 int rc;
1871
1872                 lu_dev = lod2lu_dev(lod);
1873                 dt_device_init(&lod->lod_dt_dev, type);
1874                 rc = lod_init0(env, lod, type, lcfg);
1875                 if (rc != 0) {
1876                         lod_device_free(env, lu_dev);
1877                         lu_dev = ERR_PTR(rc);
1878                 }
1879         }
1880
1881         return lu_dev;
1882 }
1883
1884 static void lod_avoid_guide_fini(struct lod_avoid_guide *lag)
1885 {
1886         if (lag->lag_oss_avoid_array)
1887                 OBD_FREE(lag->lag_oss_avoid_array,
1888                          sizeof(u32) * lag->lag_oaa_size);
1889         if (lag->lag_ost_avoid_bitmap)
1890                 CFS_FREE_BITMAP(lag->lag_ost_avoid_bitmap);
1891 }
1892
1893 /**
1894  * Implementation of lu_device_type_operations::ldto_device_fini() for LOD
1895  *
1896  * Releases the internal resources used by LOD device.
1897  *
1898  * see include/lu_object.h for the details.
1899  */
1900 static struct lu_device *lod_device_fini(const struct lu_env *env,
1901                                          struct lu_device *d)
1902 {
1903         struct lod_device *lod = lu2lod_dev(d);
1904         int rc;
1905
1906         ENTRY;
1907
1908         lod_pools_fini(lod);
1909
1910         lod_procfs_fini(lod);
1911
1912         rc = lod_fini_tgt(env, lod, &lod->lod_ost_descs, true);
1913         if (rc)
1914                 CERROR("%s: can not fini ost descriptors: rc =  %d\n",
1915                         lod2obd(lod)->obd_name, rc);
1916
1917         rc = lod_fini_tgt(env, lod, &lod->lod_mdt_descs, false);
1918         if (rc)
1919                 CERROR("%s: can not fini mdt descriptors: rc =  %d\n",
1920                         lod2obd(lod)->obd_name, rc);
1921
1922         RETURN(NULL);
1923 }
1924
1925 /**
1926  * Implementation of obd_ops::o_connect() for LOD
1927  *
1928  * Used to track all the users of this specific LOD device,
1929  * so the device stays up until the last user disconnected.
1930  *
1931  * \param[in] env               LU environment provided by the caller
1932  * \param[out] exp              export the caller will be using to access LOD
1933  * \param[in] obd               OBD device representing LOD device
1934  * \param[in] cluuid            unique identifier of the caller
1935  * \param[in] data              not used
1936  * \param[in] localdata         not used
1937  *
1938  * \retval 0                    on success
1939  * \retval negative             negated errno on error
1940  **/
1941 static int lod_obd_connect(const struct lu_env *env, struct obd_export **exp,
1942                            struct obd_device *obd, struct obd_uuid *cluuid,
1943                            struct obd_connect_data *data, void *localdata)
1944 {
1945         struct lod_device *lod = lu2lod_dev(obd->obd_lu_dev);
1946         struct lustre_handle conn;
1947         int rc;
1948
1949         ENTRY;
1950
1951         CDEBUG(D_CONFIG, "connect #%d\n", lod->lod_connects);
1952
1953         rc = class_connect(&conn, obd, cluuid);
1954         if (rc)
1955                 RETURN(rc);
1956
1957         *exp = class_conn2export(&conn);
1958
1959         spin_lock(&lod->lod_connects_lock);
1960         lod->lod_connects++;
1961         /* at the moment we expect the only user */
1962         LASSERT(lod->lod_connects == 1);
1963         spin_unlock(&lod->lod_connects_lock);
1964
1965         RETURN(0);
1966 }
1967
1968 /**
1969  *
1970  * Implementation of obd_ops::o_disconnect() for LOD
1971  *
1972  * When the caller doesn't need to use this LOD instance, it calls
1973  * obd_disconnect() and LOD releases corresponding export/reference count.
1974  * Once all the users gone, LOD device is released.
1975  *
1976  * \param[in] exp               export provided to the caller in obd_connect()
1977  *
1978  * \retval 0                    on success
1979  * \retval negative             negated errno on error
1980  **/
1981 static int lod_obd_disconnect(struct obd_export *exp)
1982 {
1983         struct obd_device *obd = exp->exp_obd;
1984         struct lod_device *lod = lu2lod_dev(obd->obd_lu_dev);
1985         int rc, release = 0;
1986
1987         ENTRY;
1988
1989         /* Only disconnect the underlying layers on the final disconnect. */
1990         spin_lock(&lod->lod_connects_lock);
1991         lod->lod_connects--;
1992         if (lod->lod_connects != 0) {
1993                 /* why should there be more than 1 connect? */
1994                 spin_unlock(&lod->lod_connects_lock);
1995                 CERROR("%s: disconnect #%d\n", exp->exp_obd->obd_name,
1996                        lod->lod_connects);
1997                 goto out;
1998         }
1999         spin_unlock(&lod->lod_connects_lock);
2000
2001         /* the last user of lod has gone, let's release the device */
2002         release = 1;
2003
2004 out:
2005         rc = class_disconnect(exp); /* bz 9811 */
2006
2007         if (rc == 0 && release)
2008                 class_manual_cleanup(obd);
2009         RETURN(rc);
2010 }
2011
2012 LU_KEY_INIT(lod, struct lod_thread_info);
2013
2014 static void lod_key_fini(const struct lu_context *ctx,
2015                 struct lu_context_key *key, void *data)
2016 {
2017         struct lod_thread_info *info = data;
2018         struct lod_layout_component *lds =
2019                                 info->lti_def_striping.lds_def_comp_entries;
2020
2021         /*
2022          * allocated in lod_get_lov_ea
2023          * XXX: this is overload, a tread may have such store but used only
2024          * once. Probably better would be pool of such stores per LOD.
2025          */
2026         if (info->lti_ea_store) {
2027                 OBD_FREE_LARGE(info->lti_ea_store, info->lti_ea_store_size);
2028                 info->lti_ea_store = NULL;
2029                 info->lti_ea_store_size = 0;
2030         }
2031         lu_buf_free(&info->lti_linkea_buf);
2032
2033         if (lds)
2034                 lod_free_def_comp_entries(&info->lti_def_striping);
2035
2036         if (info->lti_comp_size > 0)
2037                 OBD_FREE(info->lti_comp_idx,
2038                          info->lti_comp_size * sizeof(u32));
2039
2040         lod_avoid_guide_fini(&info->lti_avoid);
2041
2042         OBD_FREE_PTR(info);
2043 }
2044
2045 /* context key: lod_thread_key */
2046 LU_CONTEXT_KEY_DEFINE(lod, LCT_MD_THREAD);
2047
2048 LU_TYPE_INIT_FINI(lod, &lod_thread_key);
2049
2050 static struct lu_device_type_operations lod_device_type_ops = {
2051         .ldto_init           = lod_type_init,
2052         .ldto_fini           = lod_type_fini,
2053
2054         .ldto_start          = lod_type_start,
2055         .ldto_stop           = lod_type_stop,
2056
2057         .ldto_device_alloc   = lod_device_alloc,
2058         .ldto_device_free    = lod_device_free,
2059
2060         .ldto_device_fini    = lod_device_fini
2061 };
2062
2063 static struct lu_device_type lod_device_type = {
2064         .ldt_tags     = LU_DEVICE_DT,
2065         .ldt_name     = LUSTRE_LOD_NAME,
2066         .ldt_ops      = &lod_device_type_ops,
2067         .ldt_ctx_tags = LCT_MD_THREAD,
2068 };
2069
2070 /**
2071  * Implementation of obd_ops::o_get_info() for LOD
2072  *
2073  * Currently, there is only one supported key: KEY_OSP_CONNECTED , to provide
2074  * the caller binary status whether LOD has seen connection to any OST target.
2075  * It will also check if the MDT update log context being initialized (if
2076  * needed).
2077  *
2078  * \param[in] env               LU environment provided by the caller
2079  * \param[in] exp               export of the caller
2080  * \param[in] keylen            len of the key
2081  * \param[in] key               the key
2082  * \param[in] vallen            not used
2083  * \param[in] val               not used
2084  *
2085  * \retval                      0 if a connection was seen
2086  * \retval                      -EAGAIN if LOD isn't running yet or no
2087  *                              connection has been seen yet
2088  * \retval                      -EINVAL if not supported key is requested
2089  **/
2090 static int lod_obd_get_info(const struct lu_env *env, struct obd_export *exp,
2091                             u32 keylen, void *key, u32 *vallen, void *val)
2092 {
2093         int rc = -EINVAL;
2094
2095         if (KEY_IS(KEY_OSP_CONNECTED)) {
2096                 struct obd_device *obd = exp->exp_obd;
2097                 struct lod_device *d;
2098                 struct lod_tgt_desc *tgt;
2099                 unsigned int i;
2100                 int rc = 1;
2101
2102                 if (!obd->obd_set_up || obd->obd_stopping)
2103                         RETURN(-EAGAIN);
2104
2105                 d = lu2lod_dev(obd->obd_lu_dev);
2106                 lod_getref(&d->lod_ost_descs);
2107                 lod_foreach_ost(d, i) {
2108                         tgt = OST_TGT(d, i);
2109                         LASSERT(tgt && tgt->ltd_tgt);
2110                         rc = obd_get_info(env, tgt->ltd_exp, keylen, key,
2111                                           vallen, val);
2112                         /* one healthy device is enough */
2113                         if (rc == 0)
2114                                 break;
2115                 }
2116                 lod_putref(d, &d->lod_ost_descs);
2117
2118                 lod_getref(&d->lod_mdt_descs);
2119                 lod_foreach_mdt(d, i) {
2120                         struct llog_ctxt *ctxt;
2121
2122                         tgt = MDT_TGT(d, i);
2123                         LASSERT(tgt != NULL);
2124                         LASSERT(tgt->ltd_tgt != NULL);
2125                         if (!tgt->ltd_active)
2126                                 continue;
2127
2128                         ctxt = llog_get_context(tgt->ltd_tgt->dd_lu_dev.ld_obd,
2129                                                 LLOG_UPDATELOG_ORIG_CTXT);
2130                         if (!ctxt) {
2131                                 CDEBUG(D_INFO, "%s: %s is not ready.\n",
2132                                        obd->obd_name,
2133                                       tgt->ltd_tgt->dd_lu_dev.ld_obd->obd_name);
2134                                 rc = -EAGAIN;
2135                                 break;
2136                         }
2137                         if (!ctxt->loc_handle) {
2138                                 CDEBUG(D_INFO, "%s: %s is not ready.\n",
2139                                        obd->obd_name,
2140                                       tgt->ltd_tgt->dd_lu_dev.ld_obd->obd_name);
2141                                 rc = -EAGAIN;
2142                                 llog_ctxt_put(ctxt);
2143                                 break;
2144                         }
2145                         llog_ctxt_put(ctxt);
2146                 }
2147                 lod_putref(d, &d->lod_mdt_descs);
2148
2149                 RETURN(rc);
2150         }
2151
2152         RETURN(rc);
2153 }
2154
2155 static int lod_obd_set_info_async(const struct lu_env *env,
2156                                   struct obd_export *exp,
2157                                   u32 keylen, void *key,
2158                                   u32 vallen, void *val,
2159                                   struct ptlrpc_request_set *set)
2160 {
2161         struct obd_device *obd = class_exp2obd(exp);
2162         struct lod_device *d;
2163         struct lod_tgt_desc *tgt;
2164         int no_set = 0;
2165         int i, rc = 0, rc2;
2166
2167         ENTRY;
2168
2169         if (!set) {
2170                 no_set = 1;
2171                 set = ptlrpc_prep_set();
2172                 if (!set)
2173                         RETURN(-ENOMEM);
2174         }
2175
2176         d = lu2lod_dev(obd->obd_lu_dev);
2177         lod_getref(&d->lod_ost_descs);
2178         lod_foreach_ost(d, i) {
2179                 tgt = OST_TGT(d, i);
2180                 LASSERT(tgt && tgt->ltd_tgt);
2181                 if (!tgt->ltd_active)
2182                         continue;
2183
2184                 rc2 = obd_set_info_async(env, tgt->ltd_exp, keylen, key,
2185                                          vallen, val, set);
2186                 if (rc2 != 0 && rc == 0)
2187                         rc = rc2;
2188         }
2189         lod_putref(d, &d->lod_ost_descs);
2190
2191         lod_getref(&d->lod_mdt_descs);
2192         lod_foreach_mdt(d, i) {
2193                 tgt = MDT_TGT(d, i);
2194                 LASSERT(tgt && tgt->ltd_tgt);
2195                 if (!tgt->ltd_active)
2196                         continue;
2197                 rc2 = obd_set_info_async(env, tgt->ltd_exp, keylen, key,
2198                                          vallen, val, set);
2199                 if (rc2 != 0 && rc == 0)
2200                         rc = rc2;
2201         }
2202         lod_putref(d, &d->lod_mdt_descs);
2203
2204
2205         if (no_set) {
2206                 rc2 = ptlrpc_set_wait(env, set);
2207                 if (rc2 == 0 && rc == 0)
2208                         rc = rc2;
2209                 ptlrpc_set_destroy(set);
2210         }
2211         RETURN(rc);
2212 }
2213
2214 static struct obd_ops lod_obd_device_ops = {
2215         .o_owner        = THIS_MODULE,
2216         .o_connect      = lod_obd_connect,
2217         .o_disconnect   = lod_obd_disconnect,
2218         .o_get_info     = lod_obd_get_info,
2219         .o_set_info_async = lod_obd_set_info_async,
2220         .o_pool_new     = lod_pool_new,
2221         .o_pool_rem     = lod_pool_remove,
2222         .o_pool_add     = lod_pool_add,
2223         .o_pool_del     = lod_pool_del,
2224 };
2225
2226 static struct obd_type *sym;
2227
2228 static int __init lod_init(void)
2229 {
2230         int rc;
2231
2232         rc = lu_kmem_init(lod_caches);
2233         if (rc)
2234                 return rc;
2235
2236         rc = class_register_type(&lod_obd_device_ops, NULL, true, NULL,
2237                                  LUSTRE_LOD_NAME, &lod_device_type);
2238         if (rc) {
2239                 lu_kmem_fini(lod_caches);
2240                 return rc;
2241         }
2242
2243         /* create "lov" entry for compatibility purposes */
2244         sym = class_add_symlinks(LUSTRE_LOV_NAME, true);
2245         if (IS_ERR(sym)) {
2246                 rc = PTR_ERR(sym);
2247                 /* does real "lov" already exist ? */
2248                 if (rc == -EEXIST)
2249                         rc = 0;
2250         }
2251
2252         return rc;
2253 }
2254
2255 static void __exit lod_exit(void)
2256 {
2257         if (!IS_ERR_OR_NULL(sym))
2258                 kobject_put(&sym->typ_kobj);
2259
2260         class_unregister_type(LUSTRE_LOD_NAME);
2261         lu_kmem_fini(lod_caches);
2262 }
2263
2264 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2265 MODULE_DESCRIPTION("Lustre Logical Object Device ("LUSTRE_LOD_NAME")");
2266 MODULE_VERSION(LUSTRE_VERSION_STRING);
2267 MODULE_LICENSE("GPL");
2268
2269 module_init(lod_init);
2270 module_exit(lod_exit);