Whamcloud - gitweb
5b2b38f26174ed8a34df6e3f08097213fa2169c8
[fs/lustre-release.git] / lustre / lod / lod_dev.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright  2009 Sun Microsystems, Inc. All rights reserved
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/lod/lod_dev.c
33  *
34  * Lustre Logical Object Device
35  *
36  * Author: Alex Zhuravlev <alexey.zhuravlev@intel.com>
37  * Author: Mikhail Pershin <mike.pershin@intel.com>
38  */
39 /**
40  * The Logical Object Device (LOD) layer manages access to striped
41  * objects (both regular files and directories). It implements the DT
42  * device and object APIs and is responsible for creating, storing,
43  * and loading striping information as an extended attribute of the
44  * underlying OSD object. LOD is the server side analog of the LOV and
45  * LMV layers on the client side.
46  *
47  * Metadata LU object stack (layers of the same compound LU object,
48  * all have the same FID):
49  *
50  *        MDT
51  *         |      MD API
52  *        MDD
53  *         |      DT API
54  *        LOD
55  *       /   \    DT API
56  *     OSD   OSP
57  *
58  * During LOD object initialization the localness or remoteness of the
59  * object FID dictates the choice between OSD and OSP.
60  *
61  * An LOD object (file or directory) with N stripes (each has a
62  * different FID):
63  *
64  *          LOD
65  *           |
66  *   +---+---+---+...+
67  *   |   |   |   |   |
68  *   S0  S1  S2  S3  S(N-1)  OS[DP] objects, seen as DT objects by LOD
69  *
70  * When upper layers must access an object's stripes (which are
71  * themselves OST or MDT LU objects) LOD finds these objects by their
72  * FIDs and stores them as an array of DT object pointers on the
73  * object. Declarations and operations on LOD objects are received by
74  * LOD (as DT object operations) and performed on the underlying
75  * OS[DP] object and (as needed) on the stripes. From the perspective
76  * of LOD, a stripe-less file (created by mknod() or open with
77  * O_LOV_DELAY_CREATE) is an object which does not yet have stripes,
78  * while a non-striped directory (created by mkdir()) is an object
79  * which will never have stripes.
80  *
81  * The LOD layer also implements a small subset of the OBD device API
82  * to support MDT stack initialization and finalization (an MDD device
83  * connects and disconnects itself to and from the underlying LOD
84  * device), and pool management. In turn LOD uses the OBD device API
85  * to connect it self to the underlying OSD, and to connect itself to
86  * OSP devices representing the MDTs and OSTs that bear the stripes of
87  * its objects.
88  */
89
90 #define DEBUG_SUBSYSTEM S_MDS
91
92 #include <linux/kthread.h>
93 #include <obd_class.h>
94 #include <md_object.h>
95 #include <lustre_fid.h>
96 #include <uapi/linux/lustre/lustre_param.h>
97 #include <lustre_update.h>
98 #include <lustre_log.h>
99
100 #include "lod_internal.h"
101
102 static const char lod_update_log_name[] = "update_log";
103 static const char lod_update_log_dir_name[] = "update_log_dir";
104
105 /*
106  * Lookup target by FID.
107  *
108  * Lookup MDT/OST target index by FID. Type of the target can be
109  * specific or any.
110  *
111  * \param[in] env               LU environment provided by the caller
112  * \param[in] lod               lod device
113  * \param[in] fid               FID
114  * \param[out] tgt              result target index
115  * \param[in] type              expected type of the target:
116  *                              LU_SEQ_RANGE_{MDT,OST,ANY}
117  *
118  * \retval 0                    on success
119  * \retval negative             negated errno on error
120  **/
121 int lod_fld_lookup(const struct lu_env *env, struct lod_device *lod,
122                    const struct lu_fid *fid, u32 *tgt, int *type)
123 {
124         struct lu_seq_range range = { 0 };
125         struct lu_server_fld *server_fld;
126         int rc;
127
128         ENTRY;
129
130         if (!fid_is_sane(fid)) {
131                 CERROR("%s: invalid FID "DFID"\n", lod2obd(lod)->obd_name,
132                        PFID(fid));
133                 RETURN(-EIO);
134         }
135
136         if (fid_is_idif(fid)) {
137                 *tgt = fid_idif_ost_idx(fid);
138                 *type = LU_SEQ_RANGE_OST;
139                 RETURN(0);
140         }
141
142         if (fid_is_update_log(fid) || fid_is_update_log_dir(fid)) {
143                 *tgt = fid_oid(fid);
144                 *type = LU_SEQ_RANGE_MDT;
145                 RETURN(0);
146         }
147
148         if (!lod->lod_initialized || (!fid_seq_in_fldb(fid_seq(fid)))) {
149                 LASSERT(lu_site2seq(lod2lu_dev(lod)->ld_site) != NULL);
150
151                 *tgt = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
152                 *type = LU_SEQ_RANGE_MDT;
153                 RETURN(0);
154         }
155
156         server_fld = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_server_fld;
157         if (!server_fld)
158                 RETURN(-EIO);
159
160         fld_range_set_type(&range, *type);
161         rc = fld_server_lookup(env, server_fld, fid_seq(fid), &range);
162         if (rc != 0)
163                 RETURN(rc);
164
165         *tgt = range.lsr_index;
166         *type = range.lsr_flags;
167
168         CDEBUG(D_INFO, "%s: got tgt %x for sequence: %#llx\n",
169                lod2obd(lod)->obd_name, *tgt, fid_seq(fid));
170
171         RETURN(0);
172 }
173
174 /* Slab for OSD object allocation */
175 struct kmem_cache *lod_object_kmem;
176
177 /* Slab for dt_txn_callback */
178 struct kmem_cache *lod_txn_callback_kmem;
179 static struct lu_kmem_descr lod_caches[] = {
180         {
181                 .ckd_cache = &lod_object_kmem,
182                 .ckd_name  = "lod_obj",
183                 .ckd_size  = sizeof(struct lod_object)
184         },
185         {
186                 .ckd_cache = &lod_txn_callback_kmem,
187                 .ckd_name  = "lod_txn_callback",
188                 .ckd_size  = sizeof(struct dt_txn_callback)
189         },
190         {
191                 .ckd_cache = NULL
192         }
193 };
194
195 static struct lu_device *lod_device_fini(const struct lu_env *env,
196                                          struct lu_device *d);
197
198 /**
199  * Implementation of lu_device_operations::ldo_object_alloc() for LOD
200  *
201  * Allocates and initializes LOD's slice in the given object.
202  *
203  * see include/lu_object.h for the details.
204  */
205 static struct lu_object *lod_object_alloc(const struct lu_env *env,
206                                           const struct lu_object_header *hdr,
207                                           struct lu_device *dev)
208 {
209         struct lod_object *lod_obj;
210         struct lu_object *lu_obj;
211
212         ENTRY;
213
214         OBD_SLAB_ALLOC_PTR_GFP(lod_obj, lod_object_kmem, GFP_NOFS);
215         if (!lod_obj)
216                 RETURN(ERR_PTR(-ENOMEM));
217
218         mutex_init(&lod_obj->ldo_layout_mutex);
219         lu_obj = lod2lu_obj(lod_obj);
220         dt_object_init(&lod_obj->ldo_obj, NULL, dev);
221         lod_obj->ldo_obj.do_ops = &lod_obj_ops;
222         lu_obj->lo_ops = &lod_lu_obj_ops;
223
224         RETURN(lu_obj);
225 }
226
227 /**
228  * Process the config log for all sub device.
229  *
230  * The function goes through all the targets in the given table
231  * and apply given configuration command on to the targets.
232  * Used to cleanup the targets at unmount.
233  *
234  * \param[in] env               LU environment provided by the caller
235  * \param[in] lod               lod device
236  * \param[in] ltd               target's table to go through
237  * \param[in] lcfg              configuration command to apply
238  *
239  * \retval 0                    on success
240  * \retval negative             negated errno on error
241  **/
242 static int lod_sub_process_config(const struct lu_env *env,
243                                  struct lod_device *lod,
244                                  struct lod_tgt_descs *ltd,
245                                  struct lustre_cfg *lcfg)
246 {
247         struct lu_device *next;
248         struct lu_tgt_desc *tgt;
249         int rc = 0;
250
251         lod_getref(ltd);
252         ltd_foreach_tgt(ltd, tgt) {
253                 int rc1;
254
255                 LASSERT(tgt && tgt->ltd_tgt);
256                 next = &tgt->ltd_tgt->dd_lu_dev;
257                 rc1 = next->ld_ops->ldo_process_config(env, next, lcfg);
258                 if (rc1) {
259                         CERROR("%s: error cleaning up LOD index %u: cmd %#x : rc = %d\n",
260                                lod2obd(lod)->obd_name, tgt->ltd_index,
261                                lcfg->lcfg_command, rc1);
262                         rc = rc1;
263                 }
264         }
265         lod_putref(lod, ltd);
266         return rc;
267 }
268
269 struct lod_recovery_data {
270         struct lod_device       *lrd_lod;
271         struct lod_tgt_desc     *lrd_ltd;
272         struct ptlrpc_thread    *lrd_thread;
273         u32                     lrd_idx;
274 };
275
276
277 /**
278  * process update recovery record
279  *
280  * Add the update recovery recode to the update recovery list in
281  * lod_recovery_data. Then the recovery thread (target_recovery_thread)
282  * will redo these updates.
283  *
284  * \param[in]env        execution environment
285  * \param[in]llh        log handle of update record
286  * \param[in]rec        update record to be replayed
287  * \param[in]data       update recovery data which holds the necessary
288  *                      arguments for recovery (see struct lod_recovery_data)
289  *
290  * \retval              0 if the record is processed successfully.
291  * \retval              negative errno if the record processing fails.
292  */
293 static int lod_process_recovery_updates(const struct lu_env *env,
294                                         struct llog_handle *llh,
295                                         struct llog_rec_hdr *rec,
296                                         void *data)
297 {
298         struct lod_recovery_data *lrd = data;
299         struct llog_cookie *cookie = &lod_env_info(env)->lti_cookie;
300         struct lu_target *lut;
301         u32 index = 0;
302
303         ENTRY;
304
305         if (!lrd->lrd_ltd) {
306                 int rc;
307
308                 rc = lodname2mdt_index(lod2obd(lrd->lrd_lod)->obd_name, &index);
309                 if (rc != 0)
310                         return rc;
311         } else {
312                 index = lrd->lrd_ltd->ltd_index;
313         }
314
315         if (rec->lrh_len !=
316                 llog_update_record_size((struct llog_update_record *)rec)) {
317                 CERROR("%s: broken update record! index %u "DFID".%u: rc = %d\n",
318                        lod2obd(lrd->lrd_lod)->obd_name, index,
319                        PFID(&llh->lgh_id.lgl_oi.oi_fid), rec->lrh_index, -EIO);
320                 return -EINVAL;
321         }
322
323         cookie->lgc_lgl = llh->lgh_id;
324         cookie->lgc_index = rec->lrh_index;
325         cookie->lgc_subsys = LLOG_UPDATELOG_ORIG_CTXT;
326
327         CDEBUG(D_HA, "%s: process recovery updates "DFID".%u\n",
328                lod2obd(lrd->lrd_lod)->obd_name,
329                PFID(&llh->lgh_id.lgl_oi.oi_fid), rec->lrh_index);
330         lut = lod2lu_dev(lrd->lrd_lod)->ld_site->ls_tgt;
331
332         if (lut->lut_obd->obd_stopping ||
333             lut->lut_obd->obd_abort_recovery)
334                 return -ESHUTDOWN;
335
336         return insert_update_records_to_replay_list(lut->lut_tdtd,
337                                         (struct llog_update_record *)rec,
338                                         cookie, index);
339 }
340
341 /**
342  * recovery thread for update log
343  *
344  * Start recovery thread and prepare the sub llog, then it will retrieve
345  * the update records from the correpondent MDT and do recovery.
346  *
347  * \param[in] arg       pointer to the recovery data
348  *
349  * \retval              0 if recovery succeeds
350  * \retval              negative errno if recovery failed.
351  */
352 static int lod_sub_recovery_thread(void *arg)
353 {
354         struct lod_recovery_data *lrd = arg;
355         struct lod_device *lod = lrd->lrd_lod;
356         struct dt_device *dt;
357         struct ptlrpc_thread *thread = lrd->lrd_thread;
358         struct llog_ctxt *ctxt = NULL;
359         struct lu_env env;
360         struct lu_target *lut;
361         struct lu_tgt_desc *mdt = NULL;
362         time64_t start;
363         int retries = 0;
364         int rc;
365
366         ENTRY;
367
368         thread->t_flags = SVC_RUNNING;
369         wake_up(&thread->t_ctl_waitq);
370
371         rc = lu_env_init(&env, LCT_LOCAL | LCT_MD_THREAD);
372         if (rc != 0) {
373                 OBD_FREE_PTR(lrd);
374                 CERROR("%s: can't initialize env: rc = %d\n",
375                        lod2obd(lod)->obd_name, rc);
376                 RETURN(rc);
377         }
378
379         lut = lod2lu_dev(lod)->ld_site->ls_tgt;
380         atomic_inc(&lut->lut_tdtd->tdtd_recovery_threads_count);
381         if (!lrd->lrd_ltd)
382                 dt = lod->lod_child;
383         else
384                 dt = lrd->lrd_ltd->ltd_tgt;
385
386         start = ktime_get_real_seconds();
387
388 again:
389
390         if (unlikely(OBD_FAIL_PRECHECK(OBD_FAIL_TGT_RECOVERY_CONNECT)) &&
391             lrd->lrd_ltd) {
392                 OBD_FAIL_TIMEOUT(OBD_FAIL_TGT_RECOVERY_CONNECT, cfs_fail_val);
393                 rc = -EIO;
394         } else {
395                 rc = lod_sub_prep_llog(&env, lod, dt, lrd->lrd_idx);
396         }
397         if (!rc && !lod->lod_child->dd_rdonly) {
398                 /* Process the recovery record */
399                 ctxt = llog_get_context(dt->dd_lu_dev.ld_obd,
400                                         LLOG_UPDATELOG_ORIG_CTXT);
401                 LASSERT(ctxt != NULL);
402                 LASSERT(ctxt->loc_handle != NULL);
403
404                 rc = llog_cat_process(&env, ctxt->loc_handle,
405                                       lod_process_recovery_updates, lrd, 0, 0);
406         }
407
408         if (rc < 0) {
409                 struct lu_device *top_device;
410
411                 top_device = lod->lod_dt_dev.dd_lu_dev.ld_site->ls_top_dev;
412                 /*
413                  * Because the remote target might failover at the same time,
414                  * let's retry here
415                  */
416                 if ((rc == -ETIMEDOUT || rc == -EAGAIN || rc == -EIO) &&
417                      dt != lod->lod_child &&
418                     !top_device->ld_obd->obd_abort_recovery &&
419                     !top_device->ld_obd->obd_stopping) {
420                         if (ctxt) {
421                                 if (ctxt->loc_handle)
422                                         llog_cat_close(&env,
423                                                        ctxt->loc_handle);
424                                 llog_ctxt_put(ctxt);
425                         }
426                         retries++;
427                         CDEBUG(D_HA, "%s get update log failed %d, retry\n",
428                                dt->dd_lu_dev.ld_obd->obd_name, rc);
429                         goto again;
430                 }
431
432                 CERROR("%s get update log failed: rc = %d\n",
433                        dt->dd_lu_dev.ld_obd->obd_name, rc);
434                 llog_ctxt_put(ctxt);
435
436                 spin_lock(&top_device->ld_obd->obd_dev_lock);
437                 if (!top_device->ld_obd->obd_abort_recovery &&
438                     !top_device->ld_obd->obd_stopping)
439                         top_device->ld_obd->obd_abort_recovery = 1;
440                 spin_unlock(&top_device->ld_obd->obd_dev_lock);
441
442                 GOTO(out, rc);
443         }
444         llog_ctxt_put(ctxt);
445
446         CDEBUG(D_HA, "%s retrieved update log, duration %lld, retries %d\n",
447                dt->dd_lu_dev.ld_obd->obd_name, ktime_get_real_seconds() - start,
448                retries);
449
450         spin_lock(&lod->lod_lock);
451         if (!lrd->lrd_ltd)
452                 lod->lod_child_got_update_log = 1;
453         else
454                 lrd->lrd_ltd->ltd_got_update_log = 1;
455
456         if (!lod->lod_child_got_update_log) {
457                 spin_unlock(&lod->lod_lock);
458                 GOTO(out, rc = 0);
459         }
460
461         lod_foreach_mdt(lod, mdt) {
462                 if (!mdt->ltd_got_update_log) {
463                         spin_unlock(&lod->lod_lock);
464                         GOTO(out, rc = 0);
465                 }
466         }
467         lut->lut_tdtd->tdtd_replay_ready = 1;
468         spin_unlock(&lod->lod_lock);
469
470         CDEBUG(D_HA, "%s got update logs from all MDTs.\n",
471                lut->lut_obd->obd_name);
472         wake_up(&lut->lut_obd->obd_next_transno_waitq);
473         EXIT;
474
475 out:
476         OBD_FREE_PTR(lrd);
477         thread->t_flags = SVC_STOPPED;
478         atomic_dec(&lut->lut_tdtd->tdtd_recovery_threads_count);
479         wake_up(&lut->lut_tdtd->tdtd_recovery_threads_waitq);
480         wake_up(&thread->t_ctl_waitq);
481         lu_env_fini(&env);
482         return rc;
483 }
484
485 /**
486  * finish sub llog context
487  *
488  * Stop update recovery thread for the sub device, then cleanup the
489  * correspondent llog ctxt.
490  *
491  * \param[in] env      execution environment
492  * \param[in] lod      lod device to do update recovery
493  * \param[in] thread   recovery thread on this sub device
494  */
495 void lod_sub_fini_llog(const struct lu_env *env,
496                        struct dt_device *dt, struct ptlrpc_thread *thread)
497 {
498         struct obd_device *obd;
499         struct llog_ctxt *ctxt;
500
501         ENTRY;
502
503         obd = dt->dd_lu_dev.ld_obd;
504         CDEBUG(D_INFO, "%s: finish sub llog\n", obd->obd_name);
505         /* Stop recovery thread first */
506         if (thread && thread->t_flags & SVC_RUNNING) {
507                 thread->t_flags = SVC_STOPPING;
508                 wake_up(&thread->t_ctl_waitq);
509                 wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
510         }
511
512         ctxt = llog_get_context(obd, LLOG_UPDATELOG_ORIG_CTXT);
513         if (!ctxt)
514                 RETURN_EXIT;
515
516         if (ctxt->loc_handle)
517                 llog_cat_close(env, ctxt->loc_handle);
518
519         llog_cleanup(env, ctxt);
520
521         RETURN_EXIT;
522 }
523
524 /**
525  * Extract MDT target index from a device name.
526  *
527  * a helper function to extract index from the given device name
528  * like "fsname-MDTxxxx-mdtlov"
529  *
530  * \param[in] lodname           device name
531  * \param[out] mdt_index        extracted index
532  *
533  * \retval 0            on success
534  * \retval -EINVAL      if the name is invalid
535  */
536 int lodname2mdt_index(char *lodname, u32 *mdt_index)
537 {
538         u32 index;
539         const char *ptr, *tmp;
540         int rc;
541
542         /* 1.8 configs don't have "-MDT0000" at the end */
543         ptr = strstr(lodname, "-MDT");
544         if (!ptr) {
545                 *mdt_index = 0;
546                 return 0;
547         }
548
549         ptr = strrchr(lodname, '-');
550         if (!ptr) {
551                 rc = -EINVAL;
552                 CERROR("invalid MDT index in '%s': rc = %d\n", lodname, rc);
553                 return rc;
554         }
555
556         if (strncmp(ptr, "-mdtlov", 7) != 0) {
557                 rc = -EINVAL;
558                 CERROR("invalid MDT index in '%s': rc = %d\n", lodname, rc);
559                 return rc;
560         }
561
562         if ((unsigned long)ptr - (unsigned long)lodname <= 8) {
563                 rc = -EINVAL;
564                 CERROR("invalid MDT index in '%s': rc = %d\n", lodname, rc);
565                 return rc;
566         }
567
568         if (strncmp(ptr - 8, "-MDT", 4) != 0) {
569                 rc = -EINVAL;
570                 CERROR("invalid MDT index in '%s': rc = %d\n", lodname, rc);
571                 return rc;
572         }
573
574         rc = target_name2index(ptr - 7, &index, &tmp);
575         if (rc < 0 || rc & LDD_F_SV_ALL || *tmp != '-') {
576                 rc = -EINVAL;
577                 CERROR("invalid MDT index in '%s': rc = %d\n", lodname, rc);
578                 return rc;
579         }
580         *mdt_index = index;
581         return 0;
582 }
583
584 /**
585  * Init sub llog context
586  *
587  * Setup update llog ctxt for update recovery threads, then start the
588  * recovery thread (lod_sub_recovery_thread) to read update llog from
589  * the correspondent MDT to do update recovery.
590  *
591  * \param[in] env       execution environment
592  * \param[in] lod       lod device to do update recovery
593  * \param[in] dt        sub dt device for which the recovery thread is
594  *
595  * \retval              0 if initialization succeeds.
596  * \retval              negative errno if initialization fails.
597  */
598 int lod_sub_init_llog(const struct lu_env *env, struct lod_device *lod,
599                       struct dt_device *dt)
600 {
601         struct obd_device *obd;
602         struct lod_recovery_data *lrd = NULL;
603         struct ptlrpc_thread *thread;
604         struct task_struct *task;
605         struct lod_tgt_desc *subtgt = NULL;
606         u32 index;
607         u32 master_index;
608         int rc;
609
610         ENTRY;
611
612         rc = lodname2mdt_index(lod2obd(lod)->obd_name, &master_index);
613         if (rc != 0)
614                 RETURN(rc);
615
616         OBD_ALLOC_PTR(lrd);
617         if (!lrd)
618                 RETURN(-ENOMEM);
619
620         if (lod->lod_child == dt) {
621                 thread = &lod->lod_child_recovery_thread;
622                 index = master_index;
623         } else {
624                 struct lu_tgt_desc *mdt;
625
626                 lod_foreach_mdt(lod, mdt) {
627                         if (mdt->ltd_tgt == dt) {
628                                 index = mdt->ltd_index;
629                                 subtgt = mdt;
630                                 break;
631                         }
632                 }
633                 LASSERT(subtgt != NULL);
634                 OBD_ALLOC_PTR(subtgt->ltd_recovery_thread);
635                 if (!subtgt->ltd_recovery_thread)
636                         GOTO(free_lrd, rc = -ENOMEM);
637
638                 thread = subtgt->ltd_recovery_thread;
639         }
640
641         CDEBUG(D_INFO, "%s init sub log %s\n", lod2obd(lod)->obd_name,
642                dt->dd_lu_dev.ld_obd->obd_name);
643         lrd->lrd_lod = lod;
644         lrd->lrd_ltd = subtgt;
645         lrd->lrd_thread = thread;
646         lrd->lrd_idx = index;
647         init_waitqueue_head(&thread->t_ctl_waitq);
648
649         obd = dt->dd_lu_dev.ld_obd;
650         obd->obd_lvfs_ctxt.dt = dt;
651         rc = llog_setup(env, obd, &obd->obd_olg, LLOG_UPDATELOG_ORIG_CTXT,
652                         NULL, &llog_common_cat_ops);
653         if (rc < 0) {
654                 CERROR("%s: cannot setup updatelog llog: rc = %d\n",
655                        obd->obd_name, rc);
656                 GOTO(free_thread, rc);
657         }
658
659         /* Start the recovery thread */
660         task = kthread_run(lod_sub_recovery_thread, lrd, "lod%04x_rec%04x",
661                            master_index, index);
662         if (IS_ERR(task)) {
663                 rc = PTR_ERR(task);
664                 CERROR("%s: cannot start recovery thread: rc = %d\n",
665                        obd->obd_name, rc);
666                 GOTO(out_llog, rc);
667         }
668
669         wait_event_idle(thread->t_ctl_waitq, thread->t_flags & SVC_RUNNING ||
670                         thread->t_flags & SVC_STOPPED);
671
672         RETURN(0);
673 out_llog:
674         lod_sub_fini_llog(env, dt, thread);
675 free_thread:
676         if (lod->lod_child != dt) {
677                 OBD_FREE_PTR(subtgt->ltd_recovery_thread);
678                 subtgt->ltd_recovery_thread = NULL;
679         }
680 free_lrd:
681         OBD_FREE_PTR(lrd);
682         RETURN(rc);
683 }
684
685 /**
686  * Stop sub recovery thread
687  *
688  * Stop sub recovery thread on all subs.
689  *
690  * \param[in] env       execution environment
691  * \param[in] lod       lod device to do update recovery
692  */
693 static void lod_sub_stop_recovery_threads(const struct lu_env *env,
694                                           struct lod_device *lod)
695 {
696         struct ptlrpc_thread *thread;
697         struct lu_tgt_desc *mdt;
698
699         /*
700          * Stop the update log commit cancel threads and finish master
701          * llog ctxt
702          */
703         thread = &lod->lod_child_recovery_thread;
704         /* Stop recovery thread first */
705         if (thread && thread->t_flags & SVC_RUNNING) {
706                 thread->t_flags = SVC_STOPPING;
707                 wake_up(&thread->t_ctl_waitq);
708                 wait_event(thread->t_ctl_waitq, thread->t_flags & SVC_STOPPED);
709         }
710
711         lod_getref(&lod->lod_mdt_descs);
712         lod_foreach_mdt(lod, mdt) {
713                 thread = mdt->ltd_recovery_thread;
714                 if (thread && thread->t_flags & SVC_RUNNING) {
715                         thread->t_flags = SVC_STOPPING;
716                         wake_up(&thread->t_ctl_waitq);
717                         wait_event(thread->t_ctl_waitq,
718                                    thread->t_flags & SVC_STOPPED);
719                         OBD_FREE_PTR(mdt->ltd_recovery_thread);
720                         mdt->ltd_recovery_thread = NULL;
721                 }
722         }
723         lod_putref(lod, &lod->lod_mdt_descs);
724 }
725
726 /**
727  * finish all sub llog
728  *
729  * cleanup all of sub llog ctxt on the LOD.
730  *
731  * \param[in] env       execution environment
732  * \param[in] lod       lod device to do update recovery
733  */
734 static void lod_sub_fini_all_llogs(const struct lu_env *env,
735                                    struct lod_device *lod)
736 {
737         struct lu_tgt_desc *mdt;
738
739         /*
740          * Stop the update log commit cancel threads and finish master
741          * llog ctxt
742          */
743         lod_sub_fini_llog(env, lod->lod_child,
744                           &lod->lod_child_recovery_thread);
745         lod_getref(&lod->lod_mdt_descs);
746         lod_foreach_mdt(lod, mdt)
747                 lod_sub_fini_llog(env, mdt->ltd_tgt,
748                                   mdt->ltd_recovery_thread);
749         lod_putref(lod, &lod->lod_mdt_descs);
750 }
751
752 static char *lod_show_update_logs_retrievers(void *data, int *size, int *count)
753 {
754         struct lod_device *lod = (struct lod_device *)data;
755         struct lu_target *lut = lod2lu_dev(lod)->ld_site->ls_tgt;
756         struct lu_tgt_desc *mdt = NULL;
757         char *buf;
758         int len = 0;
759         int rc;
760         int i;
761
762         *count = atomic_read(&lut->lut_tdtd->tdtd_recovery_threads_count);
763         if (*count == 0) {
764                 *size = 0;
765                 return NULL;
766         }
767
768         *size = 5 * *count + 1;
769         OBD_ALLOC(buf, *size);
770         if (!buf)
771                 return NULL;
772
773         *count = 0;
774         memset(buf, 0, *size);
775
776         if (!lod->lod_child_got_update_log) {
777                 rc = lodname2mdt_index(lod2obd(lod)->obd_name, &i);
778                 LASSERTF(rc == 0, "Fail to parse target index: rc = %d\n", rc);
779
780                 rc = snprintf(buf + len, *size - len, " %04x", i);
781                 LASSERT(rc > 0);
782
783                 len += rc;
784                 (*count)++;
785         }
786
787         lod_foreach_mdt(lod, mdt) {
788                 if (!mdt->ltd_got_update_log) {
789                         rc = snprintf(buf + len, *size - len, " %04x",
790                                       mdt->ltd_index);
791                         if (unlikely(rc <= 0))
792                                 break;
793
794                         len += rc;
795                         (*count)++;
796                 }
797         }
798
799         return buf;
800 }
801
802 /**
803  * Prepare distribute txn
804  *
805  * Prepare distribute txn structure for LOD
806  *
807  * \param[in] env       execution environment
808  * \param[in] lod_device  LOD device
809  *
810  * \retval              0 if preparation succeeds.
811  * \retval              negative errno if preparation fails.
812  */
813 static int lod_prepare_distribute_txn(const struct lu_env *env,
814                                       struct lod_device *lod)
815 {
816         struct target_distribute_txn_data *tdtd;
817         struct lu_target *lut;
818         int rc;
819
820         ENTRY;
821
822         /* Init update recovery data */
823         OBD_ALLOC_PTR(tdtd);
824         if (!tdtd)
825                 RETURN(-ENOMEM);
826
827         lut = lod2lu_dev(lod)->ld_site->ls_tgt;
828         tdtd->tdtd_dt = &lod->lod_dt_dev;
829         rc = distribute_txn_init(env, lut, tdtd,
830                 lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id);
831
832         if (rc < 0) {
833                 CERROR("%s: cannot init distribute txn: rc = %d\n",
834                        lod2obd(lod)->obd_name, rc);
835                 OBD_FREE_PTR(tdtd);
836                 RETURN(rc);
837         }
838
839         tdtd->tdtd_show_update_logs_retrievers =
840                 lod_show_update_logs_retrievers;
841         tdtd->tdtd_show_retrievers_cbdata = lod;
842
843         lut->lut_tdtd = tdtd;
844
845         RETURN(0);
846 }
847
848 /**
849  * Finish distribute txn
850  *
851  * Release the resource holding by distribute txn, i.e. stop distribute
852  * txn thread.
853  *
854  * \param[in] env       execution environment
855  * \param[in] lod       lod device
856  */
857 static void lod_fini_distribute_txn(const struct lu_env *env,
858                                     struct lod_device *lod)
859 {
860         struct lu_target *lut;
861
862         lut = lod2lu_dev(lod)->ld_site->ls_tgt;
863         target_recovery_fini(lut->lut_obd);
864         if (!lut->lut_tdtd)
865                 return;
866
867         distribute_txn_fini(env, lut->lut_tdtd);
868
869         OBD_FREE_PTR(lut->lut_tdtd);
870         lut->lut_tdtd = NULL;
871 }
872
873 /**
874  * Implementation of lu_device_operations::ldo_process_config() for LOD
875  *
876  * The method is called by the configuration subsystem during setup,
877  * cleanup and when the configuration changes. The method processes
878  * few specific commands like adding/removing the targets, changing
879  * the runtime parameters.
880
881  * \param[in] env               LU environment provided by the caller
882  * \param[in] dev               lod device
883  * \param[in] lcfg              configuration command to apply
884  *
885  * \retval 0                    on success
886  * \retval negative             negated errno on error
887  *
888  * The examples are below.
889  *
890  * Add osc config log:
891  * marker  20 (flags=0x01, v2.2.49.56) lustre-OST0001  'add osc'
892  * add_uuid  nid=192.168.122.162@tcp(0x20000c0a87aa2)  0:  1:nidxxx
893  * attach    0:lustre-OST0001-osc-MDT0001  1:osc  2:lustre-MDT0001-mdtlov_UUID
894  * setup     0:lustre-OST0001-osc-MDT0001  1:lustre-OST0001_UUID  2:nid
895  * lov_modify_tgts add 0:lustre-MDT0001-mdtlov  1:lustre-OST0001_UUID  2:1  3:1
896  * marker  20 (flags=0x02, v2.2.49.56) lustre-OST0001  'add osc'
897  *
898  * Add mdc config log:
899  * marker  10 (flags=0x01, v2.2.49.56) lustre-MDT0000  'add osp'
900  * add_uuid  nid=192.168.122.162@tcp(0x20000c0a87aa2)  0:  1:nid
901  * attach 0:lustre-MDT0000-osp-MDT0001  1:osp  2:lustre-MDT0001-mdtlov_UUID
902  * setup     0:lustre-MDT0000-osp-MDT0001  1:lustre-MDT0000_UUID  2:nid
903  * modify_mdc_tgts add 0:lustre-MDT0001  1:lustre-MDT0000_UUID  2:0  3:1
904  * marker  10 (flags=0x02, v2.2.49.56) lustre-MDT0000_UUID  'add osp'
905  */
906 static int lod_process_config(const struct lu_env *env,
907                               struct lu_device *dev,
908                               struct lustre_cfg *lcfg)
909 {
910         struct lod_device *lod = lu2lod_dev(dev);
911         struct lu_device *next = &lod->lod_child->dd_lu_dev;
912         char *arg1;
913         int rc = 0;
914
915         ENTRY;
916
917         switch (lcfg->lcfg_command) {
918         case LCFG_LOV_DEL_OBD:
919         case LCFG_LOV_ADD_INA:
920         case LCFG_LOV_ADD_OBD:
921         case LCFG_ADD_MDC: {
922                 u32 index;
923                 u32 mdt_index;
924                 int gen;
925                 /*
926                  * lov_modify_tgts add  0:lov_mdsA  1:osp  2:0  3:1
927                  * modify_mdc_tgts add  0:lustre-MDT0001
928                  *                    1:lustre-MDT0001-mdc0002
929                  *                    2:2  3:1
930                  */
931                 arg1 = lustre_cfg_string(lcfg, 1);
932
933                 if (sscanf(lustre_cfg_buf(lcfg, 2), "%d", &index) != 1)
934                         GOTO(out, rc = -EINVAL);
935                 if (sscanf(lustre_cfg_buf(lcfg, 3), "%d", &gen) != 1)
936                         GOTO(out, rc = -EINVAL);
937
938                 if (lcfg->lcfg_command == LCFG_LOV_ADD_OBD) {
939                         u32 mdt_index;
940
941                         rc = lodname2mdt_index(lustre_cfg_string(lcfg, 0),
942                                                &mdt_index);
943                         if (rc != 0)
944                                 GOTO(out, rc);
945
946                         rc = lod_add_device(env, lod, arg1, index, gen,
947                                             mdt_index, LUSTRE_OSC_NAME, 1);
948                 } else if (lcfg->lcfg_command == LCFG_ADD_MDC) {
949                         mdt_index = index;
950                         rc = lod_add_device(env, lod, arg1, index, gen,
951                                             mdt_index, LUSTRE_MDC_NAME, 1);
952                 } else if (lcfg->lcfg_command == LCFG_LOV_ADD_INA) {
953                         /*FIXME: Add mdt_index for LCFG_LOV_ADD_INA*/
954                         mdt_index = 0;
955                         rc = lod_add_device(env, lod, arg1, index, gen,
956                                             mdt_index, LUSTRE_OSC_NAME, 0);
957                 } else {
958                         rc = lod_del_device(env, lod, &lod->lod_ost_descs,
959                                             arg1, index, gen);
960                 }
961
962                 break;
963         }
964
965         case LCFG_PARAM: {
966                 struct obd_device *obd;
967                 ssize_t count;
968                 char *param;
969
970                 /*
971                  * Check if it is activate/deactivate mdc
972                  * lustre-MDTXXXX-osp-MDTXXXX.active=1
973                  */
974                 param = lustre_cfg_buf(lcfg, 1);
975                 if (strstr(param, "osp") && strstr(param, ".active=")) {
976                         struct lod_tgt_desc *sub_tgt = NULL;
977                         struct lu_tgt_desc *mdt;
978                         char *ptr;
979                         char *tmp;
980
981                         ptr = strstr(param, ".");
982                         *ptr = '\0';
983                         obd = class_name2obd(param);
984                         if (!obd) {
985                                 CERROR("%s: can not find %s: rc = %d\n",
986                                        lod2obd(lod)->obd_name, param, -EINVAL);
987                                 *ptr = '.';
988                                 GOTO(out, rc);
989                         }
990
991                         lod_foreach_mdt(lod, mdt) {
992                                 if (mdt->ltd_tgt->dd_lu_dev.ld_obd == obd) {
993                                         sub_tgt = mdt;
994                                         break;
995                                 }
996                         }
997
998                         if (!sub_tgt) {
999                                 CERROR("%s: can not find %s: rc = %d\n",
1000                                        lod2obd(lod)->obd_name, param, -EINVAL);
1001                                 *ptr = '.';
1002                                 GOTO(out, rc);
1003                         }
1004
1005                         *ptr = '.';
1006                         tmp = strstr(param, "=");
1007                         tmp++;
1008                         if (*tmp == '1') {
1009                                 struct llog_ctxt *ctxt;
1010
1011                                 obd = sub_tgt->ltd_tgt->dd_lu_dev.ld_obd;
1012                                 ctxt = llog_get_context(obd,
1013                                                 LLOG_UPDATELOG_ORIG_CTXT);
1014                                 if (!ctxt) {
1015                                         rc = llog_setup(env, obd, &obd->obd_olg,
1016                                                        LLOG_UPDATELOG_ORIG_CTXT,
1017                                                     NULL, &llog_common_cat_ops);
1018                                         if (rc < 0)
1019                                                 GOTO(out, rc);
1020                                 } else {
1021                                         llog_ctxt_put(ctxt);
1022                                 }
1023                                 rc = lod_sub_prep_llog(env, lod,
1024                                                        sub_tgt->ltd_tgt,
1025                                                        sub_tgt->ltd_index);
1026                                 if (rc == 0)
1027                                         sub_tgt->ltd_active = 1;
1028                         } else {
1029                                 lod_sub_fini_llog(env, sub_tgt->ltd_tgt,
1030                                                   NULL);
1031                                 sub_tgt->ltd_active = 0;
1032                         }
1033                         GOTO(out, rc);
1034                 }
1035
1036
1037                 if (strstr(param, PARAM_LOD) != NULL)
1038                         count = class_modify_config(lcfg, PARAM_LOD,
1039                                                     &lod->lod_dt_dev.dd_kobj);
1040                 else
1041                         count = class_modify_config(lcfg, PARAM_LOV,
1042                                                     &lod->lod_dt_dev.dd_kobj);
1043                 rc = count > 0 ? 0 : count;
1044                 GOTO(out, rc);
1045         }
1046         case LCFG_PRE_CLEANUP: {
1047                 lod_sub_process_config(env, lod, &lod->lod_mdt_descs, lcfg);
1048                 lod_sub_process_config(env, lod, &lod->lod_ost_descs, lcfg);
1049                 OBD_FAIL_TIMEOUT(OBD_FAIL_TGT_RECOVERY_CONNECT, cfs_fail_val * 2);
1050                 next = &lod->lod_child->dd_lu_dev;
1051                 rc = next->ld_ops->ldo_process_config(env, next, lcfg);
1052                 if (rc != 0)
1053                         CDEBUG(D_HA, "%s: can't process %u: %d\n",
1054                                lod2obd(lod)->obd_name, lcfg->lcfg_command, rc);
1055
1056                 lod_sub_stop_recovery_threads(env, lod);
1057                 lod_fini_distribute_txn(env, lod);
1058                 lod_sub_fini_all_llogs(env, lod);
1059                 break;
1060         }
1061         case LCFG_CLEANUP: {
1062                 if (lod->lod_md_root) {
1063                         dt_object_put(env, &lod->lod_md_root->ldo_obj);
1064                         lod->lod_md_root = NULL;
1065                 }
1066
1067                 /*
1068                  * do cleanup on underlying storage only when
1069                  * all OSPs are cleaned up, as they use that OSD as well
1070                  */
1071                 lu_dev_del_linkage(dev->ld_site, dev);
1072                 lod_sub_process_config(env, lod, &lod->lod_mdt_descs, lcfg);
1073                 lod_sub_process_config(env, lod, &lod->lod_ost_descs, lcfg);
1074                 next = &lod->lod_child->dd_lu_dev;
1075                 rc = next->ld_ops->ldo_process_config(env, next, lcfg);
1076                 if (rc)
1077                         CERROR("%s: can't process %u: rc = %d\n",
1078                                lod2obd(lod)->obd_name, lcfg->lcfg_command, rc);
1079
1080                 rc = obd_disconnect(lod->lod_child_exp);
1081                 if (rc)
1082                         CERROR("error in disconnect from storage: rc = %d\n",
1083                                rc);
1084                 break;
1085         }
1086         default:
1087                 CERROR("%s: unknown command %u\n", lod2obd(lod)->obd_name,
1088                        lcfg->lcfg_command);
1089                 rc = -EINVAL;
1090                 break;
1091         }
1092
1093 out:
1094         RETURN(rc);
1095 }
1096
1097 /**
1098  * Implementation of lu_device_operations::ldo_recovery_complete() for LOD
1099  *
1100  * The method is called once the recovery is complete. This implementation
1101  * distributes the notification to all the known targets.
1102  *
1103  * see include/lu_object.h for the details
1104  */
1105 static int lod_recovery_complete(const struct lu_env *env,
1106                                  struct lu_device *dev)
1107 {
1108         struct lod_device *lod = lu2lod_dev(dev);
1109         struct lu_device *next = &lod->lod_child->dd_lu_dev;
1110         struct lod_tgt_desc *tgt;
1111         int rc;
1112
1113         ENTRY;
1114
1115         LASSERT(lod->lod_recovery_completed == 0);
1116         lod->lod_recovery_completed = 1;
1117
1118         rc = next->ld_ops->ldo_recovery_complete(env, next);
1119
1120         lod_getref(&lod->lod_ost_descs);
1121         if (lod->lod_ost_descs.ltd_tgts_size > 0) {
1122                 lod_foreach_ost(lod, tgt) {
1123                         LASSERT(tgt && tgt->ltd_tgt);
1124                         next = &tgt->ltd_tgt->dd_lu_dev;
1125                         rc = next->ld_ops->ldo_recovery_complete(env, next);
1126                         if (rc)
1127                                 CERROR("%s: can't complete recovery on #%d: rc = %d\n",
1128                                        lod2obd(lod)->obd_name, tgt->ltd_index,
1129                                        rc);
1130                 }
1131         }
1132         lod_putref(lod, &lod->lod_ost_descs);
1133         RETURN(rc);
1134 }
1135
1136 /**
1137  * Init update logs on all sub device
1138  *
1139  * LOD initialize update logs on all of sub devices. Because the initialization
1140  * process might need FLD lookup, see llog_osd_open()->dt_locate()->...->
1141  * lod_object_init(), this API has to be called after LOD is initialized.
1142  * \param[in] env       execution environment
1143  * \param[in] lod       lod device
1144  *
1145  * \retval              0 if update log is initialized successfully.
1146  * \retval              negative errno if initialization fails.
1147  */
1148 static int lod_sub_init_llogs(const struct lu_env *env, struct lod_device *lod)
1149 {
1150         struct lu_tgt_desc *mdt;
1151         int rc;
1152
1153         ENTRY;
1154
1155         /*
1156          * llog must be setup after LOD is initialized, because llog
1157          * initialization include FLD lookup
1158          */
1159         LASSERT(lod->lod_initialized);
1160
1161         /* Init the llog in its own stack */
1162         rc = lod_sub_init_llog(env, lod, lod->lod_child);
1163         if (rc < 0)
1164                 RETURN(rc);
1165
1166         lod_foreach_mdt(lod, mdt) {
1167                 rc = lod_sub_init_llog(env, lod, mdt->ltd_tgt);
1168                 if (rc != 0)
1169                         break;
1170         }
1171
1172         RETURN(rc);
1173 }
1174
1175 /**
1176  * Implementation of lu_device_operations::ldo_prepare() for LOD
1177  *
1178  * see include/lu_object.h for the details.
1179  */
1180 static int lod_prepare(const struct lu_env *env, struct lu_device *pdev,
1181                        struct lu_device *cdev)
1182 {
1183         struct lod_device *lod = lu2lod_dev(cdev);
1184         struct lu_device *next = &lod->lod_child->dd_lu_dev;
1185         struct lu_fid *fid = &lod_env_info(env)->lti_fid;
1186         int rc;
1187         struct dt_object *root;
1188         struct dt_object *dto;
1189         u32 index;
1190
1191         ENTRY;
1192
1193         rc = next->ld_ops->ldo_prepare(env, pdev, next);
1194         if (rc != 0) {
1195                 CERROR("%s: prepare bottom error: rc = %d\n",
1196                        lod2obd(lod)->obd_name, rc);
1197                 RETURN(rc);
1198         }
1199
1200         lod->lod_initialized = 1;
1201
1202         rc = dt_root_get(env, lod->lod_child, fid);
1203         if (rc < 0)
1204                 RETURN(rc);
1205
1206         root = dt_locate(env, lod->lod_child, fid);
1207         if (IS_ERR(root))
1208                 RETURN(PTR_ERR(root));
1209
1210         /* Create update log object */
1211         index = lu_site2seq(lod2lu_dev(lod)->ld_site)->ss_node_id;
1212         lu_update_log_fid(fid, index);
1213
1214         dto = local_file_find_or_create_with_fid(env, lod->lod_child,
1215                                                  fid, root,
1216                                                  lod_update_log_name,
1217                                                  S_IFREG | 0644);
1218         if (IS_ERR(dto))
1219                 GOTO(out_put, rc = PTR_ERR(dto));
1220
1221         dt_object_put(env, dto);
1222
1223         /* Create update log dir */
1224         lu_update_log_dir_fid(fid, index);
1225         dto = local_file_find_or_create_with_fid(env, lod->lod_child,
1226                                                  fid, root,
1227                                                  lod_update_log_dir_name,
1228                                                  S_IFDIR | 0644);
1229         if (IS_ERR(dto))
1230                 GOTO(out_put, rc = PTR_ERR(dto));
1231
1232         dt_object_put(env, dto);
1233
1234         rc = lod_prepare_distribute_txn(env, lod);
1235         if (rc != 0)
1236                 GOTO(out_put, rc);
1237
1238         rc = lod_sub_init_llogs(env, lod);
1239         if (rc != 0)
1240                 GOTO(out_put, rc);
1241
1242 out_put:
1243         dt_object_put(env, root);
1244
1245         RETURN(rc);
1246 }
1247
1248 const struct lu_device_operations lod_lu_ops = {
1249         .ldo_object_alloc       = lod_object_alloc,
1250         .ldo_process_config     = lod_process_config,
1251         .ldo_recovery_complete  = lod_recovery_complete,
1252         .ldo_prepare            = lod_prepare,
1253 };
1254
1255 /**
1256  * Implementation of dt_device_operations::dt_root_get() for LOD
1257  *
1258  * see include/dt_object.h for the details.
1259  */
1260 static int lod_root_get(const struct lu_env *env,
1261                         struct dt_device *dev, struct lu_fid *f)
1262 {
1263         return dt_root_get(env, dt2lod_dev(dev)->lod_child, f);
1264 }
1265
1266 static void lod_statfs_sum(struct obd_statfs *sfs,
1267                              struct obd_statfs *ost_sfs, int *bs)
1268 {
1269         while (ost_sfs->os_bsize < *bs) {
1270                 *bs >>= 1;
1271                 sfs->os_bsize >>= 1;
1272                 sfs->os_bavail <<= 1;
1273                 sfs->os_blocks <<= 1;
1274                 sfs->os_bfree <<= 1;
1275                 sfs->os_granted <<= 1;
1276         }
1277         while (ost_sfs->os_bsize > *bs) {
1278                 ost_sfs->os_bsize >>= 1;
1279                 ost_sfs->os_bavail <<= 1;
1280                 ost_sfs->os_blocks <<= 1;
1281                 ost_sfs->os_bfree <<= 1;
1282                 ost_sfs->os_granted <<= 1;
1283         }
1284         sfs->os_bavail += ost_sfs->os_bavail;
1285         sfs->os_blocks += ost_sfs->os_blocks;
1286         sfs->os_bfree += ost_sfs->os_bfree;
1287         sfs->os_granted += ost_sfs->os_granted;
1288 }
1289
1290 /**
1291  * Implementation of dt_device_operations::dt_statfs() for LOD
1292  *
1293  * see include/dt_object.h for the details.
1294  */
1295 static int lod_statfs(const struct lu_env *env, struct dt_device *dev,
1296                       struct obd_statfs *sfs, struct obd_statfs_info *info)
1297 {
1298         struct lod_device *lod = dt2lod_dev(dev);
1299         struct lu_tgt_desc *tgt;
1300         struct obd_statfs ost_sfs;
1301         u64 ost_files = 0;
1302         u64 ost_ffree = 0;
1303         int rc, bs;
1304
1305         rc = dt_statfs(env, dt2lod_dev(dev)->lod_child, sfs);
1306         if (rc)
1307                 GOTO(out, rc);
1308
1309         bs = sfs->os_bsize;
1310
1311         sfs->os_bavail = 0;
1312         sfs->os_blocks = 0;
1313         sfs->os_bfree = 0;
1314         sfs->os_granted = 0;
1315
1316         lod_getref(&lod->lod_mdt_descs);
1317         lod_foreach_mdt(lod, tgt) {
1318                 rc = dt_statfs(env, tgt->ltd_tgt, &ost_sfs);
1319                 /* ignore errors */
1320                 if (rc)
1321                         continue;
1322                 sfs->os_files += ost_sfs.os_files;
1323                 sfs->os_ffree += ost_sfs.os_ffree;
1324                 lod_statfs_sum(sfs, &ost_sfs, &bs);
1325         }
1326         lod_putref(lod, &lod->lod_mdt_descs);
1327
1328         /*
1329          * at some point we can check whether DoM is enabled and
1330          * decide how to account MDT space. for simplicity let's
1331          * just fallback to pre-DoM policy if any OST is alive
1332          */
1333         lod_getref(&lod->lod_ost_descs);
1334         lod_foreach_ost(lod, tgt) {
1335                 rc = dt_statfs(env, tgt->ltd_tgt, &ost_sfs);
1336                 /* ignore errors */
1337                 if (rc || ost_sfs.os_bsize == 0)
1338                         continue;
1339                 if (!ost_files) {
1340                         /*
1341                          * if only MDTs with DoM then report only MDT blocks,
1342                          * otherwise show only OST blocks, and DoM is "free"
1343                          */
1344                         sfs->os_bavail = 0;
1345                         sfs->os_blocks = 0;
1346                         sfs->os_bfree = 0;
1347                         sfs->os_granted = 0;
1348                 }
1349                 ost_files += ost_sfs.os_files;
1350                 ost_ffree += ost_sfs.os_ffree;
1351                 ost_sfs.os_bavail += ost_sfs.os_granted;
1352                 lod_statfs_sum(sfs, &ost_sfs, &bs);
1353                 LASSERTF(bs == ost_sfs.os_bsize, "%d != %d\n",
1354                         (int)sfs->os_bsize, (int)ost_sfs.os_bsize);
1355         }
1356         lod_putref(lod, &lod->lod_ost_descs);
1357         sfs->os_state |= OS_STATE_SUM;
1358
1359         /* If we have _some_ OSTs, but don't have as many free objects on the
1360          * OSTs as inodes on the MDTs, reduce the reported number of inodes
1361          * to compensate, so that the "inodes in use" number is correct.
1362          * This should be kept in sync with ll_statfs_internal().
1363          */
1364         if (ost_files && ost_ffree < sfs->os_ffree) {
1365                 sfs->os_files = (sfs->os_files - sfs->os_ffree) + ost_ffree;
1366                 sfs->os_ffree = ost_ffree;
1367         }
1368
1369         /* a single successful statfs should be enough */
1370         rc = 0;
1371
1372 out:
1373         RETURN(rc);
1374 }
1375
1376 /**
1377  * Implementation of dt_device_operations::dt_trans_create() for LOD
1378  *
1379  * Creates a transaction using local (to this node) OSD.
1380  *
1381  * see include/dt_object.h for the details.
1382  */
1383 static struct thandle *lod_trans_create(const struct lu_env *env,
1384                                         struct dt_device *dt)
1385 {
1386         struct thandle *th;
1387
1388         th = top_trans_create(env, dt2lod_dev(dt)->lod_child);
1389         if (IS_ERR(th))
1390                 return th;
1391
1392         th->th_dev = dt;
1393
1394         return th;
1395 }
1396
1397 /**
1398  * Implementation of dt_device_operations::dt_trans_start() for LOD
1399  *
1400  * Starts the set of local transactions using the targets involved
1401  * in declare phase. Initial support for the distributed transactions.
1402  *
1403  * see include/dt_object.h for the details.
1404  */
1405 static int lod_trans_start(const struct lu_env *env, struct dt_device *dt,
1406                            struct thandle *th)
1407 {
1408         return top_trans_start(env, dt2lod_dev(dt)->lod_child, th);
1409 }
1410
1411 static int lod_trans_cb_add(struct thandle *th,
1412                             struct dt_txn_commit_cb *dcb)
1413 {
1414         struct top_thandle      *top_th = container_of(th, struct top_thandle,
1415                                                        tt_super);
1416         return dt_trans_cb_add(top_th->tt_master_sub_thandle, dcb);
1417 }
1418
1419 /**
1420  * add noop update to the update records
1421  *
1422  * Add noop updates to the update records, which is only used in
1423  * test right now.
1424  *
1425  * \param[in] env       execution environment
1426  * \param[in] dt        dt device of lod
1427  * \param[in] th        thandle
1428  * \param[in] count     the count of update records to be added.
1429  *
1430  * \retval              0 if adding succeeds.
1431  * \retval              negative errno if adding fails.
1432  */
1433 static int lod_add_noop_records(const struct lu_env *env,
1434                                 struct dt_device *dt, struct thandle *th,
1435                                 int count)
1436 {
1437         struct top_thandle *top_th;
1438         struct lu_fid *fid = &lod_env_info(env)->lti_fid;
1439         int i;
1440         int rc = 0;
1441
1442         top_th = container_of(th, struct top_thandle, tt_super);
1443         if (!top_th->tt_multiple_thandle)
1444                 return 0;
1445
1446         fid_zero(fid);
1447         for (i = 0; i < count; i++) {
1448                 rc = update_record_pack(noop, th, fid);
1449                 if (rc < 0)
1450                         return rc;
1451         }
1452         return rc;
1453 }
1454
1455 /**
1456  * Implementation of dt_device_operations::dt_trans_stop() for LOD
1457  *
1458  * Stops the set of local transactions using the targets involved
1459  * in declare phase. Initial support for the distributed transactions.
1460  *
1461  * see include/dt_object.h for the details.
1462  */
1463 static int lod_trans_stop(const struct lu_env *env, struct dt_device *dt,
1464                           struct thandle *th)
1465 {
1466         if (OBD_FAIL_CHECK(OBD_FAIL_SPLIT_UPDATE_REC)) {
1467                 int rc;
1468
1469                 rc = lod_add_noop_records(env, dt, th, 5000);
1470                 if (rc < 0)
1471                         RETURN(rc);
1472         }
1473         return top_trans_stop(env, dt2lod_dev(dt)->lod_child, th);
1474 }
1475
1476 /**
1477  * Implementation of dt_device_operations::dt_conf_get() for LOD
1478  *
1479  * Currently returns the configuration provided by the local OSD.
1480  *
1481  * see include/dt_object.h for the details.
1482  */
1483 static void lod_conf_get(const struct lu_env *env,
1484                          const struct dt_device *dev,
1485                          struct dt_device_param *param)
1486 {
1487         dt_conf_get(env, dt2lod_dev((struct dt_device *)dev)->lod_child, param);
1488 }
1489
1490 /**
1491  * Implementation of dt_device_operations::dt_sync() for LOD
1492  *
1493  * Syncs all known OST targets. Very very expensive and used
1494  * rarely by LFSCK now. Should not be used in general.
1495  *
1496  * see include/dt_object.h for the details.
1497  */
1498 static int lod_sync(const struct lu_env *env, struct dt_device *dev)
1499 {
1500         struct lod_device *lod = dt2lod_dev(dev);
1501         struct lu_tgt_desc *tgt;
1502         int rc = 0;
1503
1504         ENTRY;
1505
1506         lod_getref(&lod->lod_ost_descs);
1507         lod_foreach_ost(lod, tgt) {
1508                 if (!tgt->ltd_active)
1509                         continue;
1510                 rc = dt_sync(env, tgt->ltd_tgt);
1511                 if (rc) {
1512                         if (rc != -ENOTCONN) {
1513                                 CERROR("%s: can't sync ost %u: rc = %d\n",
1514                                        lod2obd(lod)->obd_name, tgt->ltd_index,
1515                                        rc);
1516                                 break;
1517                         }
1518                         rc = 0;
1519                 }
1520         }
1521         lod_putref(lod, &lod->lod_ost_descs);
1522
1523         if (rc)
1524                 RETURN(rc);
1525
1526         lod_getref(&lod->lod_mdt_descs);
1527         lod_foreach_mdt(lod, tgt) {
1528                 if (!tgt->ltd_active)
1529                         continue;
1530                 rc = dt_sync(env, tgt->ltd_tgt);
1531                 if (rc) {
1532                         if (rc != -ENOTCONN) {
1533                                 CERROR("%s: can't sync mdt %u: rc = %d\n",
1534                                        lod2obd(lod)->obd_name, tgt->ltd_index,
1535                                        rc);
1536                                 break;
1537                         }
1538                         rc = 0;
1539                 }
1540         }
1541         lod_putref(lod, &lod->lod_mdt_descs);
1542
1543         if (rc == 0)
1544                 rc = dt_sync(env, lod->lod_child);
1545
1546         RETURN(rc);
1547 }
1548
1549 /**
1550  * Implementation of dt_device_operations::dt_ro() for LOD
1551  *
1552  * Turns local OSD read-only, used for the testing only.
1553  *
1554  * see include/dt_object.h for the details.
1555  */
1556 static int lod_ro(const struct lu_env *env, struct dt_device *dev)
1557 {
1558         return dt_ro(env, dt2lod_dev(dev)->lod_child);
1559 }
1560
1561 /**
1562  * Implementation of dt_device_operations::dt_commit_async() for LOD
1563  *
1564  * Asks local OSD to commit sooner.
1565  *
1566  * see include/dt_object.h for the details.
1567  */
1568 static int lod_commit_async(const struct lu_env *env, struct dt_device *dev)
1569 {
1570         return dt_commit_async(env, dt2lod_dev(dev)->lod_child);
1571 }
1572
1573 static const struct dt_device_operations lod_dt_ops = {
1574         .dt_root_get         = lod_root_get,
1575         .dt_statfs           = lod_statfs,
1576         .dt_trans_create     = lod_trans_create,
1577         .dt_trans_start      = lod_trans_start,
1578         .dt_trans_stop       = lod_trans_stop,
1579         .dt_conf_get         = lod_conf_get,
1580         .dt_sync             = lod_sync,
1581         .dt_ro               = lod_ro,
1582         .dt_commit_async     = lod_commit_async,
1583         .dt_trans_cb_add     = lod_trans_cb_add,
1584 };
1585
1586 /**
1587  * Connect to a local OSD.
1588  *
1589  * Used to connect to the local OSD at mount. OSD name is taken from the
1590  * configuration command passed. This connection is used to identify LU
1591  * site and pin the OSD from early removal.
1592  *
1593  * \param[in] env               LU environment provided by the caller
1594  * \param[in] lod               lod device
1595  * \param[in] cfg               configuration command to apply
1596  *
1597  * \retval 0                    on success
1598  * \retval negative             negated errno on error
1599  **/
1600 static int lod_connect_to_osd(const struct lu_env *env, struct lod_device *lod,
1601                               struct lustre_cfg *cfg)
1602 {
1603         struct obd_connect_data *data = NULL;
1604         struct obd_device *obd;
1605         char *nextdev = NULL, *p, *s;
1606         int rc, len = 0;
1607
1608         ENTRY;
1609
1610         LASSERT(cfg);
1611         LASSERT(lod->lod_child_exp == NULL);
1612
1613         /*
1614          * compatibility hack: we still use old config logs
1615          * which specify LOV, but we need to learn underlying
1616          * OSD device, which is supposed to be:
1617          *  <fsname>-MDTxxxx-osd
1618          *
1619          * 2.x MGS generates lines like the following:
1620          *   #03 (176)lov_setup 0:lustre-MDT0000-mdtlov  1:(struct lov_desc)
1621          * 1.8 MGS generates lines like the following:
1622          *   #03 (168)lov_setup 0:lustre-mdtlov  1:(struct lov_desc)
1623          *
1624          * we use "-MDT" to differentiate 2.x from 1.8
1625          */
1626         p = lustre_cfg_string(cfg, 0);
1627         if (p && strstr(p, "-mdtlov")) {
1628                 len = strlen(p) + 6;
1629                 OBD_ALLOC(nextdev, len);
1630                 if (!nextdev)
1631                         GOTO(out, rc = -ENOMEM);
1632
1633                 strcpy(nextdev, p);
1634                 s = strstr(nextdev, "-mdtlov");
1635                 if (unlikely(!s)) {
1636                         CERROR("%s: unable to parse device name: rc = %d\n",
1637                                lustre_cfg_string(cfg, 0), -EINVAL);
1638                         GOTO(out, rc = -EINVAL);
1639                 }
1640
1641                 if (strstr(nextdev, "-MDT")) {
1642                         /* 2.x config */
1643                         strcpy(s, "-osd");
1644                 } else {
1645                         /* 1.8 config */
1646                         strcpy(s, "-MDT0000-osd");
1647                 }
1648         } else {
1649                 CERROR("%s: unable to parse device name: rc = %d\n",
1650                        lustre_cfg_string(cfg, 0), -EINVAL);
1651                 GOTO(out, rc = -EINVAL);
1652         }
1653
1654         OBD_ALLOC_PTR(data);
1655         if (!data)
1656                 GOTO(out, rc = -ENOMEM);
1657
1658         obd = class_name2obd(nextdev);
1659         if (!obd) {
1660                 CERROR("%s: can not locate next device: rc = %d\n",
1661                        nextdev, -ENOTCONN);
1662                 GOTO(out, rc = -ENOTCONN);
1663         }
1664
1665         data->ocd_connect_flags = OBD_CONNECT_VERSION;
1666         data->ocd_version = LUSTRE_VERSION_CODE;
1667
1668         rc = obd_connect(env, &lod->lod_child_exp, obd, &obd->obd_uuid,
1669                          data, NULL);
1670         if (rc) {
1671                 CERROR("%s: cannot connect to next dev: rc = %d\n",
1672                        nextdev, rc);
1673                 GOTO(out, rc);
1674         }
1675
1676         lod->lod_dt_dev.dd_lu_dev.ld_site =
1677                 lod->lod_child_exp->exp_obd->obd_lu_dev->ld_site;
1678         LASSERT(lod->lod_dt_dev.dd_lu_dev.ld_site);
1679         lod->lod_child = lu2dt_dev(lod->lod_child_exp->exp_obd->obd_lu_dev);
1680
1681 out:
1682         if (data)
1683                 OBD_FREE_PTR(data);
1684         if (nextdev)
1685                 OBD_FREE(nextdev, len);
1686         RETURN(rc);
1687 }
1688
1689 /**
1690  * Initialize LOD device at setup.
1691  *
1692  * Initializes the given LOD device using the original configuration command.
1693  * The function initiates a connection to the local OSD and initializes few
1694  * internal structures like pools, target tables, etc.
1695  *
1696  * \param[in] env               LU environment provided by the caller
1697  * \param[in] lod               lod device
1698  * \param[in] ldt               not used
1699  * \param[in] cfg               configuration command
1700  *
1701  * \retval 0                    on success
1702  * \retval negative             negated errno on error
1703  **/
1704 static int lod_init0(const struct lu_env *env, struct lod_device *lod,
1705                      struct lu_device_type *ldt, struct lustre_cfg *cfg)
1706 {
1707         struct dt_device_param ddp;
1708         struct obd_device *obd;
1709         int rc;
1710
1711         ENTRY;
1712
1713         obd = class_name2obd(lustre_cfg_string(cfg, 0));
1714         if (!obd) {
1715                 rc = -ENODEV;
1716                 CERROR("Cannot find obd with name '%s': rc = %d\n",
1717                        lustre_cfg_string(cfg, 0), rc);
1718                 RETURN(rc);
1719         }
1720
1721         obd->obd_lu_dev = &lod->lod_dt_dev.dd_lu_dev;
1722         lod->lod_dt_dev.dd_lu_dev.ld_obd = obd;
1723         lod->lod_dt_dev.dd_lu_dev.ld_ops = &lod_lu_ops;
1724         lod->lod_dt_dev.dd_ops = &lod_dt_ops;
1725
1726         rc = lod_connect_to_osd(env, lod, cfg);
1727         if (rc)
1728                 RETURN(rc);
1729
1730         dt_conf_get(env, &lod->lod_dt_dev, &ddp);
1731         lod->lod_osd_max_easize = ddp.ddp_max_ea_size;
1732         lod->lod_dom_max_stripesize = (1ULL << 20); /* 1Mb as default value */
1733
1734         /* setup obd to be used with old lov code */
1735         rc = lod_pools_init(lod, cfg);
1736         if (rc)
1737                 GOTO(out_disconnect, rc);
1738
1739         rc = lod_procfs_init(lod);
1740         if (rc)
1741                 GOTO(out_pools, rc);
1742
1743         spin_lock_init(&lod->lod_lock);
1744         spin_lock_init(&lod->lod_connects_lock);
1745         lu_tgt_descs_init(&lod->lod_mdt_descs, true);
1746         lu_tgt_descs_init(&lod->lod_ost_descs, false);
1747
1748         RETURN(0);
1749
1750 out_pools:
1751         lod_pools_fini(lod);
1752 out_disconnect:
1753         obd_disconnect(lod->lod_child_exp);
1754         RETURN(rc);
1755 }
1756
1757 /**
1758  * Implementation of lu_device_type_operations::ldto_device_free() for LOD
1759  *
1760  * Releases the memory allocated for LOD device.
1761  *
1762  * see include/lu_object.h for the details.
1763  */
1764 static struct lu_device *lod_device_free(const struct lu_env *env,
1765                                          struct lu_device *lu)
1766 {
1767         struct lod_device *lod = lu2lod_dev(lu);
1768         struct lu_device  *next = &lod->lod_child->dd_lu_dev;
1769
1770         ENTRY;
1771
1772         if (atomic_read(&lu->ld_ref) > 0 &&
1773             !cfs_hash_is_empty(lu->ld_site->ls_obj_hash)) {
1774                 LIBCFS_DEBUG_MSG_DATA_DECL(msgdata, D_ERROR, NULL);
1775                 lu_site_print(env, lu->ld_site, &msgdata, lu_cdebug_printer);
1776         }
1777         LASSERTF(atomic_read(&lu->ld_ref) == 0, "lu is %p\n", lu);
1778         dt_device_fini(&lod->lod_dt_dev);
1779         OBD_FREE_PTR(lod);
1780         RETURN(next);
1781 }
1782
1783 /**
1784  * Implementation of lu_device_type_operations::ldto_device_alloc() for LOD
1785  *
1786  * Allocates LOD device and calls the helpers to initialize it.
1787  *
1788  * see include/lu_object.h for the details.
1789  */
1790 static struct lu_device *lod_device_alloc(const struct lu_env *env,
1791                                           struct lu_device_type *type,
1792                                           struct lustre_cfg *lcfg)
1793 {
1794         struct lod_device *lod;
1795         struct lu_device *lu_dev;
1796
1797         OBD_ALLOC_PTR(lod);
1798         if (!lod) {
1799                 lu_dev = ERR_PTR(-ENOMEM);
1800         } else {
1801                 int rc;
1802
1803                 lu_dev = lod2lu_dev(lod);
1804                 dt_device_init(&lod->lod_dt_dev, type);
1805                 rc = lod_init0(env, lod, type, lcfg);
1806                 if (rc != 0) {
1807                         lod_device_free(env, lu_dev);
1808                         lu_dev = ERR_PTR(rc);
1809                 }
1810         }
1811
1812         return lu_dev;
1813 }
1814
1815 static void lod_avoid_guide_fini(struct lod_avoid_guide *lag)
1816 {
1817         if (lag->lag_oss_avoid_array)
1818                 OBD_FREE(lag->lag_oss_avoid_array,
1819                          sizeof(u32) * lag->lag_oaa_size);
1820         if (lag->lag_ost_avoid_bitmap)
1821                 CFS_FREE_BITMAP(lag->lag_ost_avoid_bitmap);
1822 }
1823
1824 /**
1825  * Implementation of lu_device_type_operations::ldto_device_fini() for LOD
1826  *
1827  * Releases the internal resources used by LOD device.
1828  *
1829  * see include/lu_object.h for the details.
1830  */
1831 static struct lu_device *lod_device_fini(const struct lu_env *env,
1832                                          struct lu_device *d)
1833 {
1834         struct lod_device *lod = lu2lod_dev(d);
1835         int rc;
1836
1837         ENTRY;
1838
1839         lod_pools_fini(lod);
1840
1841         lod_procfs_fini(lod);
1842
1843         rc = lod_fini_tgt(env, lod, &lod->lod_ost_descs);
1844         if (rc)
1845                 CERROR("%s: can not fini ost descriptors: rc =  %d\n",
1846                         lod2obd(lod)->obd_name, rc);
1847
1848         rc = lod_fini_tgt(env, lod, &lod->lod_mdt_descs);
1849         if (rc)
1850                 CERROR("%s: can not fini mdt descriptors: rc =  %d\n",
1851                         lod2obd(lod)->obd_name, rc);
1852
1853         RETURN(NULL);
1854 }
1855
1856 /**
1857  * Implementation of obd_ops::o_connect() for LOD
1858  *
1859  * Used to track all the users of this specific LOD device,
1860  * so the device stays up until the last user disconnected.
1861  *
1862  * \param[in] env               LU environment provided by the caller
1863  * \param[out] exp              export the caller will be using to access LOD
1864  * \param[in] obd               OBD device representing LOD device
1865  * \param[in] cluuid            unique identifier of the caller
1866  * \param[in] data              not used
1867  * \param[in] localdata         not used
1868  *
1869  * \retval 0                    on success
1870  * \retval negative             negated errno on error
1871  **/
1872 static int lod_obd_connect(const struct lu_env *env, struct obd_export **exp,
1873                            struct obd_device *obd, struct obd_uuid *cluuid,
1874                            struct obd_connect_data *data, void *localdata)
1875 {
1876         struct lod_device *lod = lu2lod_dev(obd->obd_lu_dev);
1877         struct lustre_handle conn;
1878         int rc;
1879
1880         ENTRY;
1881
1882         CDEBUG(D_CONFIG, "connect #%d\n", lod->lod_connects);
1883
1884         rc = class_connect(&conn, obd, cluuid);
1885         if (rc)
1886                 RETURN(rc);
1887
1888         *exp = class_conn2export(&conn);
1889
1890         spin_lock(&lod->lod_connects_lock);
1891         lod->lod_connects++;
1892         /* at the moment we expect the only user */
1893         LASSERT(lod->lod_connects == 1);
1894         spin_unlock(&lod->lod_connects_lock);
1895
1896         RETURN(0);
1897 }
1898
1899 /**
1900  *
1901  * Implementation of obd_ops::o_disconnect() for LOD
1902  *
1903  * When the caller doesn't need to use this LOD instance, it calls
1904  * obd_disconnect() and LOD releases corresponding export/reference count.
1905  * Once all the users gone, LOD device is released.
1906  *
1907  * \param[in] exp               export provided to the caller in obd_connect()
1908  *
1909  * \retval 0                    on success
1910  * \retval negative             negated errno on error
1911  **/
1912 static int lod_obd_disconnect(struct obd_export *exp)
1913 {
1914         struct obd_device *obd = exp->exp_obd;
1915         struct lod_device *lod = lu2lod_dev(obd->obd_lu_dev);
1916         int rc, release = 0;
1917
1918         ENTRY;
1919
1920         /* Only disconnect the underlying layers on the final disconnect. */
1921         spin_lock(&lod->lod_connects_lock);
1922         lod->lod_connects--;
1923         if (lod->lod_connects != 0) {
1924                 /* why should there be more than 1 connect? */
1925                 spin_unlock(&lod->lod_connects_lock);
1926                 CERROR("%s: disconnect #%d\n", exp->exp_obd->obd_name,
1927                        lod->lod_connects);
1928                 goto out;
1929         }
1930         spin_unlock(&lod->lod_connects_lock);
1931
1932         /* the last user of lod has gone, let's release the device */
1933         release = 1;
1934
1935 out:
1936         rc = class_disconnect(exp); /* bz 9811 */
1937
1938         if (rc == 0 && release)
1939                 class_manual_cleanup(obd);
1940         RETURN(rc);
1941 }
1942
1943 LU_KEY_INIT(lod, struct lod_thread_info);
1944
1945 static void lod_key_fini(const struct lu_context *ctx,
1946                 struct lu_context_key *key, void *data)
1947 {
1948         struct lod_thread_info *info = data;
1949         struct lod_layout_component *lds =
1950                                 info->lti_def_striping.lds_def_comp_entries;
1951
1952         /*
1953          * allocated in lod_get_lov_ea
1954          * XXX: this is overload, a tread may have such store but used only
1955          * once. Probably better would be pool of such stores per LOD.
1956          */
1957         if (info->lti_ea_store) {
1958                 OBD_FREE_LARGE(info->lti_ea_store, info->lti_ea_store_size);
1959                 info->lti_ea_store = NULL;
1960                 info->lti_ea_store_size = 0;
1961         }
1962         lu_buf_free(&info->lti_linkea_buf);
1963
1964         if (lds)
1965                 lod_free_def_comp_entries(&info->lti_def_striping);
1966
1967         if (info->lti_comp_size > 0)
1968                 OBD_FREE(info->lti_comp_idx,
1969                          info->lti_comp_size * sizeof(u32));
1970
1971         lod_avoid_guide_fini(&info->lti_avoid);
1972
1973         OBD_FREE_PTR(info);
1974 }
1975
1976 /* context key: lod_thread_key */
1977 LU_CONTEXT_KEY_DEFINE(lod, LCT_MD_THREAD);
1978
1979 LU_TYPE_INIT_FINI(lod, &lod_thread_key);
1980
1981 static struct lu_device_type_operations lod_device_type_ops = {
1982         .ldto_init           = lod_type_init,
1983         .ldto_fini           = lod_type_fini,
1984
1985         .ldto_start          = lod_type_start,
1986         .ldto_stop           = lod_type_stop,
1987
1988         .ldto_device_alloc   = lod_device_alloc,
1989         .ldto_device_free    = lod_device_free,
1990
1991         .ldto_device_fini    = lod_device_fini
1992 };
1993
1994 static struct lu_device_type lod_device_type = {
1995         .ldt_tags     = LU_DEVICE_DT,
1996         .ldt_name     = LUSTRE_LOD_NAME,
1997         .ldt_ops      = &lod_device_type_ops,
1998         .ldt_ctx_tags = LCT_MD_THREAD,
1999 };
2000
2001 /**
2002  * Implementation of obd_ops::o_get_info() for LOD
2003  *
2004  * Currently, there is only one supported key: KEY_OSP_CONNECTED , to provide
2005  * the caller binary status whether LOD has seen connection to any OST target.
2006  * It will also check if the MDT update log context being initialized (if
2007  * needed).
2008  *
2009  * \param[in] env               LU environment provided by the caller
2010  * \param[in] exp               export of the caller
2011  * \param[in] keylen            len of the key
2012  * \param[in] key               the key
2013  * \param[in] vallen            not used
2014  * \param[in] val               not used
2015  *
2016  * \retval                      0 if a connection was seen
2017  * \retval                      -EAGAIN if LOD isn't running yet or no
2018  *                              connection has been seen yet
2019  * \retval                      -EINVAL if not supported key is requested
2020  **/
2021 static int lod_obd_get_info(const struct lu_env *env, struct obd_export *exp,
2022                             u32 keylen, void *key, u32 *vallen, void *val)
2023 {
2024         int rc = -EINVAL;
2025
2026         if (KEY_IS(KEY_OSP_CONNECTED)) {
2027                 struct obd_device *obd = exp->exp_obd;
2028                 struct lod_device *d;
2029                 struct lod_tgt_desc *tgt;
2030                 int rc = 1;
2031
2032                 if (!obd->obd_set_up || obd->obd_stopping)
2033                         RETURN(-EAGAIN);
2034
2035                 d = lu2lod_dev(obd->obd_lu_dev);
2036                 lod_getref(&d->lod_ost_descs);
2037                 lod_foreach_ost(d, tgt) {
2038                         rc = obd_get_info(env, tgt->ltd_exp, keylen, key,
2039                                           vallen, val);
2040                         /* one healthy device is enough */
2041                         if (rc == 0)
2042                                 break;
2043                 }
2044                 lod_putref(d, &d->lod_ost_descs);
2045
2046                 lod_getref(&d->lod_mdt_descs);
2047                 lod_foreach_mdt(d, tgt) {
2048                         struct llog_ctxt *ctxt;
2049
2050                         if (!tgt->ltd_active)
2051                                 continue;
2052
2053                         ctxt = llog_get_context(tgt->ltd_tgt->dd_lu_dev.ld_obd,
2054                                                 LLOG_UPDATELOG_ORIG_CTXT);
2055                         if (!ctxt) {
2056                                 CDEBUG(D_INFO, "%s: %s is not ready.\n",
2057                                        obd->obd_name,
2058                                       tgt->ltd_tgt->dd_lu_dev.ld_obd->obd_name);
2059                                 rc = -EAGAIN;
2060                                 break;
2061                         }
2062                         if (!ctxt->loc_handle) {
2063                                 CDEBUG(D_INFO, "%s: %s is not ready.\n",
2064                                        obd->obd_name,
2065                                       tgt->ltd_tgt->dd_lu_dev.ld_obd->obd_name);
2066                                 rc = -EAGAIN;
2067                                 llog_ctxt_put(ctxt);
2068                                 break;
2069                         }
2070                         llog_ctxt_put(ctxt);
2071                 }
2072                 lod_putref(d, &d->lod_mdt_descs);
2073
2074                 RETURN(rc);
2075         }
2076
2077         RETURN(rc);
2078 }
2079
2080 static int lod_obd_set_info_async(const struct lu_env *env,
2081                                   struct obd_export *exp,
2082                                   u32 keylen, void *key,
2083                                   u32 vallen, void *val,
2084                                   struct ptlrpc_request_set *set)
2085 {
2086         struct obd_device *obd = class_exp2obd(exp);
2087         struct lod_device *d;
2088         struct lod_tgt_desc *tgt;
2089         int no_set = 0;
2090         int rc = 0, rc2;
2091
2092         ENTRY;
2093
2094         if (!set) {
2095                 no_set = 1;
2096                 set = ptlrpc_prep_set();
2097                 if (!set)
2098                         RETURN(-ENOMEM);
2099         }
2100
2101         d = lu2lod_dev(obd->obd_lu_dev);
2102         lod_getref(&d->lod_ost_descs);
2103         lod_foreach_ost(d, tgt) {
2104                 if (!tgt->ltd_active)
2105                         continue;
2106
2107                 rc2 = obd_set_info_async(env, tgt->ltd_exp, keylen, key,
2108                                          vallen, val, set);
2109                 if (rc2 != 0 && rc == 0)
2110                         rc = rc2;
2111         }
2112         lod_putref(d, &d->lod_ost_descs);
2113
2114         lod_getref(&d->lod_mdt_descs);
2115         lod_foreach_mdt(d, tgt) {
2116                 if (!tgt->ltd_active)
2117                         continue;
2118                 rc2 = obd_set_info_async(env, tgt->ltd_exp, keylen, key,
2119                                          vallen, val, set);
2120                 if (rc2 != 0 && rc == 0)
2121                         rc = rc2;
2122         }
2123         lod_putref(d, &d->lod_mdt_descs);
2124
2125
2126         if (no_set) {
2127                 rc2 = ptlrpc_set_wait(env, set);
2128                 if (rc2 == 0 && rc == 0)
2129                         rc = rc2;
2130                 ptlrpc_set_destroy(set);
2131         }
2132         RETURN(rc);
2133 }
2134
2135 static struct obd_ops lod_obd_device_ops = {
2136         .o_owner        = THIS_MODULE,
2137         .o_connect      = lod_obd_connect,
2138         .o_disconnect   = lod_obd_disconnect,
2139         .o_get_info     = lod_obd_get_info,
2140         .o_set_info_async = lod_obd_set_info_async,
2141         .o_pool_new     = lod_pool_new,
2142         .o_pool_rem     = lod_pool_remove,
2143         .o_pool_add     = lod_pool_add,
2144         .o_pool_del     = lod_pool_del,
2145 };
2146
2147 static int __init lod_init(void)
2148 {
2149         struct obd_type *sym;
2150         int rc;
2151
2152         rc = lu_kmem_init(lod_caches);
2153         if (rc)
2154                 return rc;
2155
2156         rc = class_register_type(&lod_obd_device_ops, NULL, true, NULL,
2157                                  LUSTRE_LOD_NAME, &lod_device_type);
2158         if (rc) {
2159                 lu_kmem_fini(lod_caches);
2160                 return rc;
2161         }
2162
2163         /* create "lov" entry for compatibility purposes */
2164         sym = class_add_symlinks(LUSTRE_LOV_NAME, true);
2165         if (IS_ERR(sym)) {
2166                 rc = PTR_ERR(sym);
2167                 /* does real "lov" already exist ? */
2168                 if (rc == -EEXIST)
2169                         rc = 0;
2170         }
2171
2172         return rc;
2173 }
2174
2175 static void __exit lod_exit(void)
2176 {
2177         struct obd_type *sym = class_search_type(LUSTRE_LOV_NAME);
2178
2179         /* if this was never fully initialized by the lov layer
2180          * then we are responsible for freeing this obd_type
2181          */
2182         if (sym) {
2183                 /* final put if we manage this obd type */
2184                 if (sym->typ_sym_filter)
2185                         kobject_put(&sym->typ_kobj);
2186                 /* put reference taken by class_search_type */
2187                 kobject_put(&sym->typ_kobj);
2188         }
2189
2190         class_unregister_type(LUSTRE_LOD_NAME);
2191         lu_kmem_fini(lod_caches);
2192 }
2193
2194 MODULE_AUTHOR("OpenSFS, Inc. <http://www.lustre.org/>");
2195 MODULE_DESCRIPTION("Lustre Logical Object Device ("LUSTRE_LOD_NAME")");
2196 MODULE_VERSION(LUSTRE_VERSION_STRING);
2197 MODULE_LICENSE("GPL");
2198
2199 module_init(lod_init);
2200 module_exit(lod_exit);