Whamcloud - gitweb
LU-1303 mds: integration lod/osp into the stack
[fs/lustre-release.git] / lustre / mdd / mdd_device.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Whamcloud, Inc.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/mdd/mdd_device.c
37  *
38  * Lustre Metadata Server (mdd) routines
39  *
40  * Author: Wang Di <wangdi@clusterfs.com>
41  */
42
43 #define DEBUG_SUBSYSTEM S_MDS
44
45 #include <obd_class.h>
46 #include <lprocfs_status.h>
47 #include <lustre_fid.h>
48 #include <lustre_mds.h>
49 #include <lustre_disk.h>      /* for changelogs */
50 #include <lustre_param.h>
51 #include <lustre_fid.h>
52
53 #include "mdd_internal.h"
54
55 const struct md_device_operations mdd_ops;
56 static struct lu_device_type mdd_device_type;
57
58 static const char mdd_root_dir_name[] = "ROOT";
59 static const char mdd_obf_dir_name[] = "fid";
60
61 /* Slab for MDD object allocation */
62 cfs_mem_cache_t *mdd_object_kmem;
63
64 static struct lu_kmem_descr mdd_caches[] = {
65         {
66                 .ckd_cache = &mdd_object_kmem,
67                 .ckd_name  = "mdd_obj",
68                 .ckd_size  = sizeof(struct mdd_object)
69         },
70         {
71                 .ckd_cache = NULL
72         }
73 };
74
75 static int mdd_connect_to_next(const struct lu_env *env, struct mdd_device *m,
76                                const char *nextdev)
77 {
78         struct obd_connect_data *data = NULL;
79         struct lu_device        *lud = mdd2lu_dev(m);
80         struct obd_device       *obd;
81         int                      rc;
82         ENTRY;
83
84         LASSERT(m->mdd_child_exp == NULL);
85
86         OBD_ALLOC(data, sizeof(*data));
87         if (data == NULL)
88                 GOTO(out, rc = -ENOMEM);
89
90         obd = class_name2obd(nextdev);
91         if (obd == NULL) {
92                 CERROR("can't locate next device: %s\n", nextdev);
93                 GOTO(out, rc = -ENOTCONN);
94         }
95
96         data->ocd_connect_flags = OBD_CONNECT_VERSION;
97         data->ocd_version = LUSTRE_VERSION_CODE;
98
99         rc = obd_connect(NULL, &m->mdd_child_exp, obd, &obd->obd_uuid, data, NULL);
100         if (rc) {
101                 CERROR("cannot connect to next dev %s (%d)\n", nextdev, rc);
102                 GOTO(out, rc);
103         }
104
105         lud->ld_site = m->mdd_child_exp->exp_obd->obd_lu_dev->ld_site;
106         LASSERT(lud->ld_site);
107         m->mdd_child = lu2dt_dev(m->mdd_child_exp->exp_obd->obd_lu_dev);
108         lu_dev_add_linkage(lud->ld_site, lud);
109
110 out:
111         if (data)
112                 OBD_FREE(data, sizeof(*data));
113         RETURN(rc);
114 }
115
116 static int mdd_init0(const struct lu_env *env, struct mdd_device *mdd,
117                 struct lu_device_type *t, struct lustre_cfg *lcfg)
118 {
119         int rc;
120         ENTRY;
121
122         mdd->mdd_md_dev.md_lu_dev.ld_ops = &mdd_lu_ops;
123         mdd->mdd_md_dev.md_ops = &mdd_ops;
124
125         rc = mdd_connect_to_next(env, mdd, lustre_cfg_string(lcfg, 3));
126         if (rc)
127                 RETURN(rc);
128
129         mdd->mdd_atime_diff = MAX_ATIME_DIFF;
130         /* sync permission changes */
131         mdd->mdd_sync_permission = 1;
132
133         dt_conf_get(env, mdd->mdd_child, &mdd->mdd_dt_conf);
134
135         /* we are using service name but not mdd obd name
136          * for compatibility reasons.
137          * It is passed from MDT in lustre_cfg[2] buffer */
138         rc = mdd_procfs_init(mdd, lustre_cfg_string(lcfg, 2));
139         if (rc < 0)
140                 obd_disconnect(mdd->mdd_child_exp);
141
142         RETURN(rc);
143 }
144
145 static struct lu_device *mdd_device_fini(const struct lu_env *env,
146                                          struct lu_device *d)
147 {
148         struct mdd_device *mdd = lu2mdd_dev(d);
149         int rc;
150
151         if (d->ld_site)
152                 lu_dev_del_linkage(d->ld_site, d);
153
154         rc = mdd_procfs_fini(mdd);
155         if (rc) {
156                 CERROR("proc fini error %d \n", rc);
157                 return ERR_PTR(rc);
158         }
159         return NULL;
160 }
161
162 static void mdd_changelog_fini(const struct lu_env *env,
163                                struct mdd_device *mdd);
164
165 static void mdd_device_shutdown(const struct lu_env *env,
166                                 struct mdd_device *m, struct lustre_cfg *cfg)
167 {
168         ENTRY;
169         mdd_lfsck_cleanup(env, m);
170         mdd_changelog_fini(env, m);
171         if (m->mdd_dot_lustre_objs.mdd_obf)
172                 mdd_object_put(env, m->mdd_dot_lustre_objs.mdd_obf);
173         if (m->mdd_dot_lustre)
174                 mdd_object_put(env, m->mdd_dot_lustre);
175         orph_index_fini(env, m);
176         if (m->mdd_capa != NULL) {
177                 lu_object_put(env, &m->mdd_capa->do_lu);
178                 m->mdd_capa = NULL;
179         }
180         lu_site_purge(env, m->mdd_md_dev.md_lu_dev.ld_site, -1);
181         /* remove upcall device*/
182         md_upcall_fini(&m->mdd_md_dev);
183
184         if (m->mdd_child_exp)
185                 obd_disconnect(m->mdd_child_exp);
186
187         EXIT;
188 }
189
190 static int changelog_init_cb(const struct lu_env *env, struct llog_handle *llh,
191                              struct llog_rec_hdr *hdr, void *data)
192 {
193         struct mdd_device *mdd = (struct mdd_device *)data;
194         struct llog_changelog_rec *rec = (struct llog_changelog_rec *)hdr;
195         ENTRY;
196
197         LASSERT(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN);
198         LASSERT(rec->cr_hdr.lrh_type == CHANGELOG_REC);
199
200         CDEBUG(D_INFO,
201                "seeing record at index %d/%d/"LPU64" t=%x %.*s in log "LPX64"\n",
202                hdr->lrh_index, rec->cr_hdr.lrh_index, rec->cr.cr_index,
203                rec->cr.cr_type, rec->cr.cr_namelen, rec->cr.cr_name,
204                llh->lgh_id.lgl_oid);
205
206         mdd->mdd_cl.mc_index = rec->cr.cr_index;
207         RETURN(LLOG_PROC_BREAK);
208 }
209
210 static int changelog_user_init_cb(const struct lu_env *env,
211                                   struct llog_handle *llh,
212                                   struct llog_rec_hdr *hdr, void *data)
213 {
214         struct mdd_device *mdd = (struct mdd_device *)data;
215         struct llog_changelog_user_rec *rec =
216                 (struct llog_changelog_user_rec *)hdr;
217         ENTRY;
218
219         LASSERT(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN);
220         LASSERT(rec->cur_hdr.lrh_type == CHANGELOG_USER_REC);
221
222         CDEBUG(D_INFO, "seeing user at index %d/%d id=%d endrec="LPU64
223                " in log "LPX64"\n", hdr->lrh_index, rec->cur_hdr.lrh_index,
224                rec->cur_id, rec->cur_endrec, llh->lgh_id.lgl_oid);
225
226         cfs_spin_lock(&mdd->mdd_cl.mc_user_lock);
227         mdd->mdd_cl.mc_lastuser = rec->cur_id;
228         cfs_spin_unlock(&mdd->mdd_cl.mc_user_lock);
229
230         RETURN(LLOG_PROC_BREAK);
231 }
232
233
234 static int mdd_changelog_llog_init(struct mdd_device *mdd)
235 {
236         struct obd_device *obd = mdd2obd_dev(mdd);
237         struct llog_ctxt *ctxt;
238         int rc;
239
240         /* Find last changelog entry number */
241         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
242         if (ctxt == NULL) {
243                 CERROR("no changelog context\n");
244                 return -EINVAL;
245         }
246         if (!ctxt->loc_handle) {
247                 llog_ctxt_put(ctxt);
248                 return -EINVAL;
249         }
250
251         rc = llog_cat_reverse_process(NULL, ctxt->loc_handle,
252                                       changelog_init_cb, mdd);
253         llog_ctxt_put(ctxt);
254
255         if (rc < 0) {
256                 CERROR("changelog init failed: %d\n", rc);
257                 return rc;
258         }
259         CDEBUG(D_IOCTL, "changelog starting index="LPU64"\n",
260                mdd->mdd_cl.mc_index);
261
262         /* Find last changelog user id */
263         ctxt = llog_get_context(obd, LLOG_CHANGELOG_USER_ORIG_CTXT);
264         if (ctxt == NULL) {
265                 CERROR("no changelog user context\n");
266                 return -EINVAL;
267         }
268         if (!ctxt->loc_handle) {
269                 llog_ctxt_put(ctxt);
270                 return -EINVAL;
271         }
272
273         rc = llog_cat_reverse_process(NULL, ctxt->loc_handle,
274                                       changelog_user_init_cb, mdd);
275         llog_ctxt_put(ctxt);
276
277         if (rc < 0) {
278                 CERROR("changelog user init failed: %d\n", rc);
279                 return rc;
280         }
281
282         /* If we have registered users, assume we want changelogs on */
283         if (mdd->mdd_cl.mc_lastuser > 0)
284                 rc = mdd_changelog_on(mdd, 1);
285
286         return rc;
287 }
288
289 static int mdd_changelog_init(const struct lu_env *env, struct mdd_device *mdd)
290 {
291         int rc;
292
293         mdd->mdd_cl.mc_index = 0;
294         cfs_spin_lock_init(&mdd->mdd_cl.mc_lock);
295         mdd->mdd_cl.mc_starttime = cfs_time_current_64();
296         mdd->mdd_cl.mc_flags = 0; /* off by default */
297         mdd->mdd_cl.mc_mask = CHANGELOG_DEFMASK;
298         cfs_spin_lock_init(&mdd->mdd_cl.mc_user_lock);
299         mdd->mdd_cl.mc_lastuser = 0;
300
301         rc = mdd_changelog_llog_init(mdd);
302         if (rc) {
303                 CERROR("Changelog setup during init failed %d\n", rc);
304                 mdd->mdd_cl.mc_flags |= CLM_ERR;
305         }
306
307         return rc;
308 }
309
310 static void mdd_changelog_fini(const struct lu_env *env, struct mdd_device *mdd)
311 {
312         mdd->mdd_cl.mc_flags = 0;
313 }
314
315 /* Start / stop recording */
316 int mdd_changelog_on(struct mdd_device *mdd, int on)
317 {
318         int rc = 0;
319
320         if ((on == 1) && ((mdd->mdd_cl.mc_flags & CLM_ON) == 0)) {
321                 LCONSOLE_INFO("%s: changelog on\n", mdd2obd_dev(mdd)->obd_name);
322                 if (mdd->mdd_cl.mc_flags & CLM_ERR) {
323                         CERROR("Changelogs cannot be enabled due to error "
324                                "condition (see %s log).\n",
325                                mdd2obd_dev(mdd)->obd_name);
326                         rc = -ESRCH;
327                 } else {
328                         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
329                         mdd->mdd_cl.mc_flags |= CLM_ON;
330                         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
331                         rc = mdd_changelog_write_header(mdd, CLM_START);
332                 }
333         } else if ((on == 0) && ((mdd->mdd_cl.mc_flags & CLM_ON) == CLM_ON)) {
334                 LCONSOLE_INFO("%s: changelog off\n",mdd2obd_dev(mdd)->obd_name);
335                 rc = mdd_changelog_write_header(mdd, CLM_FINI);
336                 cfs_spin_lock(&mdd->mdd_cl.mc_lock);
337                 mdd->mdd_cl.mc_flags &= ~CLM_ON;
338                 cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
339         }
340         return rc;
341 }
342
343 static __u64 cl_time(void) {
344         cfs_fs_time_t time;
345
346         cfs_fs_time_current(&time);
347         return (((__u64)time.tv_sec) << 30) + time.tv_nsec;
348 }
349
350 /** Add a changelog entry \a rec to the changelog llog
351  * \param mdd
352  * \param rec
353  * \param handle - currently ignored since llogs start their own transaction;
354  *                 this will hopefully be fixed in llog rewrite
355  * \retval 0 ok
356  */
357 int mdd_changelog_llog_write(struct mdd_device         *mdd,
358                              struct llog_changelog_rec *rec,
359                              struct thandle            *handle)
360 {
361         struct obd_device *obd = mdd2obd_dev(mdd);
362         struct llog_ctxt *ctxt;
363         int rc;
364
365         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) + rec->cr.cr_namelen);
366         /* llog_lvfs_write_rec sets the llog tail len */
367         rec->cr_hdr.lrh_type = CHANGELOG_REC;
368         rec->cr.cr_time = cl_time();
369         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
370         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
371            but as long as the MDD transactions are ordered correctly for e.g.
372            rename conflicts, I don't think this should matter. */
373         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
374         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
375         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
376         if (ctxt == NULL)
377                 return -ENXIO;
378
379         /* nested journal transaction */
380         rc = llog_obd_add(NULL, ctxt, &rec->cr_hdr, NULL, NULL, 0);
381         llog_ctxt_put(ctxt);
382
383         return rc;
384 }
385
386 /** Add a changelog_ext entry \a rec to the changelog llog
387  * \param mdd
388  * \param rec
389  * \param handle - currently ignored since llogs start their own transaction;
390  *              this will hopefully be fixed in llog rewrite
391  * \retval 0 ok
392  */
393 int mdd_changelog_ext_llog_write(struct mdd_device *mdd,
394                                  struct llog_changelog_ext_rec *rec,
395                                  struct thandle *handle)
396 {
397         struct obd_device *obd = mdd2obd_dev(mdd);
398         struct llog_ctxt *ctxt;
399         int rc;
400
401         rec->cr_hdr.lrh_len = llog_data_len(sizeof(*rec) + rec->cr.cr_namelen);
402         /* llog_lvfs_write_rec sets the llog tail len */
403         rec->cr_hdr.lrh_type = CHANGELOG_REC;
404         rec->cr.cr_time = cl_time();
405         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
406         /* NB: I suppose it's possible llog_add adds out of order wrt cr_index,
407          * but as long as the MDD transactions are ordered correctly for e.g.
408          * rename conflicts, I don't think this should matter. */
409         rec->cr.cr_index = ++mdd->mdd_cl.mc_index;
410         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
411         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
412         if (ctxt == NULL)
413                 return -ENXIO;
414
415         /* nested journal transaction */
416         rc = llog_obd_add(NULL, ctxt, &rec->cr_hdr, NULL, NULL, 0);
417         llog_ctxt_put(ctxt);
418
419         return rc;
420 }
421
422 /** Remove entries with indicies up to and including \a endrec from the
423  *  changelog
424  * \param mdd
425  * \param endrec
426  * \retval 0 ok
427  */
428 int mdd_changelog_llog_cancel(const struct lu_env *env,
429                               struct mdd_device *mdd, long long endrec)
430 {
431         struct obd_device *obd = mdd2obd_dev(mdd);
432         struct llog_ctxt *ctxt;
433         long long unsigned cur;
434         int rc;
435
436         ctxt = llog_get_context(obd, LLOG_CHANGELOG_ORIG_CTXT);
437         if (ctxt == NULL)
438                 return -ENXIO;
439
440         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
441         cur = (long long)mdd->mdd_cl.mc_index;
442         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
443         if (endrec > cur)
444                 endrec = cur;
445
446         /* purge to "0" is shorthand for everything */
447         if (endrec == 0)
448                 endrec = cur;
449
450         /* If purging all records, write a header entry so we don't have an
451            empty catalog and we're sure to have a valid starting index next
452            time.  In case of crash, we just restart with old log so we're
453            allright. */
454         if (endrec == cur) {
455                 /* XXX: transaction is started by llog itself */
456                 rc = mdd_changelog_write_header(mdd, CLM_PURGE);
457                 if (rc)
458                       goto out;
459         }
460
461         /* Some records were purged, so reset repeat-access time (so we
462            record new mtime update records, so users can see a file has been
463            changed since the last purge) */
464         mdd->mdd_cl.mc_starttime = cfs_time_current_64();
465
466         /* XXX: transaction is started by llog itself */
467         rc = llog_cancel(env, ctxt, NULL, 1, (struct llog_cookie *)&endrec, 0);
468 out:
469         llog_ctxt_put(ctxt);
470         return rc;
471 }
472
473 /** Add a CL_MARK record to the changelog
474  * \param mdd
475  * \param markerflags - CLM_*
476  * \retval 0 ok
477  */
478 int mdd_changelog_write_header(struct mdd_device *mdd, int markerflags)
479 {
480         struct obd_device *obd = mdd2obd_dev(mdd);
481         struct llog_changelog_rec *rec;
482         int reclen;
483         int len = strlen(obd->obd_name);
484         int rc;
485         ENTRY;
486
487         reclen = llog_data_len(sizeof(*rec) + len);
488         OBD_ALLOC(rec, reclen);
489         if (rec == NULL)
490                 RETURN(-ENOMEM);
491
492         rec->cr.cr_flags = CLF_VERSION;
493         rec->cr.cr_type = CL_MARK;
494         rec->cr.cr_namelen = len;
495         memcpy(rec->cr.cr_name, obd->obd_name, rec->cr.cr_namelen);
496         /* Status and action flags */
497         rec->cr.cr_markerflags = mdd->mdd_cl.mc_flags | markerflags;
498
499         /* XXX: transaction is started by llog itself */
500         rc = (mdd->mdd_cl.mc_mask & (1 << CL_MARK)) ?
501                 mdd_changelog_llog_write(mdd, rec, NULL) : 0;
502
503         /* assume on or off event; reset repeat-access time */
504         mdd->mdd_cl.mc_starttime = cfs_time_current_64();
505
506         OBD_FREE(rec, reclen);
507         RETURN(rc);
508 }
509
510 /**
511  * Create ".lustre" directory.
512  */
513 static int create_dot_lustre_dir(const struct lu_env *env, struct mdd_device *m)
514 {
515         struct lu_fid *fid = &mdd_env_info(env)->mti_fid;
516         struct md_object *mdo;
517         int rc;
518
519         memcpy(fid, &LU_DOT_LUSTRE_FID, sizeof(struct lu_fid));
520         mdo = llo_store_create_index(env, &m->mdd_md_dev, m->mdd_child,
521                                      mdd_root_dir_name, dot_lustre_name,
522                                      fid, &dt_directory_features);
523         /* .lustre dir may be already present */
524         if (IS_ERR(mdo) && PTR_ERR(mdo) != -EEXIST) {
525                 rc = PTR_ERR(mdo);
526                 CERROR("creating obj [%s] fid = "DFID" rc = %d\n",
527                         dot_lustre_name, PFID(fid), rc);
528                 return rc;
529         }
530
531         if (!IS_ERR(mdo))
532                 lu_object_put(env, &mdo->mo_lu);
533
534         return 0;
535 }
536
537
538 static int dot_lustre_mdd_permission(const struct lu_env *env,
539                                      struct md_object *pobj,
540                                      struct md_object *cobj,
541                                      struct md_attr *attr, int mask)
542 {
543         if (mask & ~(MAY_READ | MAY_EXEC))
544                 return -EPERM;
545         else
546                 return 0;
547 }
548
549 static int dot_lustre_mdd_xattr_get(const struct lu_env *env,
550                                     struct md_object *obj, struct lu_buf *buf,
551                                     const char *name)
552 {
553         return 0;
554 }
555
556 static int dot_lustre_mdd_xattr_list(const struct lu_env *env,
557                                      struct md_object *obj, struct lu_buf *buf)
558 {
559         return 0;
560 }
561
562 static int dot_lustre_mdd_xattr_set(const struct lu_env *env,
563                                     struct md_object *obj,
564                                     const struct lu_buf *buf, const char *name,
565                                     int fl)
566 {
567         return -EPERM;
568 }
569
570 static int dot_lustre_mdd_xattr_del(const struct lu_env *env,
571                                     struct md_object *obj,
572                                     const char *name)
573 {
574         return -EPERM;
575 }
576
577 static int dot_lustre_mdd_readlink(const struct lu_env *env,
578                                    struct md_object *obj, struct lu_buf *buf)
579 {
580         return 0;
581 }
582
583 static int dot_lustre_mdd_object_create(const struct lu_env *env,
584                                         struct md_object *obj,
585                                         const struct md_op_spec *spec,
586                                         struct md_attr *ma)
587 {
588         return -EPERM;
589 }
590
591 static int dot_lustre_mdd_ref_add(const struct lu_env *env,
592                                   struct md_object *obj,
593                                   const struct md_attr *ma)
594 {
595         return -EPERM;
596 }
597
598 static int dot_lustre_mdd_ref_del(const struct lu_env *env,
599                                   struct md_object *obj,
600                                   struct md_attr *ma)
601 {
602         return -EPERM;
603 }
604
605 static int dot_lustre_mdd_open(const struct lu_env *env, struct md_object *obj,
606                                int flags)
607 {
608         struct mdd_object *mdd_obj = md2mdd_obj(obj);
609
610         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
611         mdd_obj->mod_count++;
612         mdd_write_unlock(env, mdd_obj);
613
614         return 0;
615 }
616
617 static int dot_lustre_mdd_close(const struct lu_env *env, struct md_object *obj,
618                                 struct md_attr *ma, int mode)
619 {
620         struct mdd_object *mdd_obj = md2mdd_obj(obj);
621
622         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
623         mdd_obj->mod_count--;
624         mdd_write_unlock(env, mdd_obj);
625
626         return 0;
627 }
628
629 static int dot_lustre_mdd_object_sync(const struct lu_env *env,
630                                       struct md_object *obj)
631 {
632         return -ENOSYS;
633 }
634
635 static int dot_lustre_mdd_path(const struct lu_env *env, struct md_object *obj,
636                            char *path, int pathlen, __u64 *recno, int *linkno)
637 {
638         return -ENOSYS;
639 }
640
641 static int dot_file_lock(const struct lu_env *env, struct md_object *obj,
642                          struct lov_mds_md *lmm, struct ldlm_extent *extent,
643                          struct lustre_handle *lockh)
644 {
645         return -ENOSYS;
646 }
647
648 static int dot_file_unlock(const struct lu_env *env, struct md_object *obj,
649                            struct lov_mds_md *lmm, struct lustre_handle *lockh)
650 {
651         return -ENOSYS;
652 }
653
654 static struct md_object_operations mdd_dot_lustre_obj_ops = {
655         .moo_permission    = dot_lustre_mdd_permission,
656         .moo_attr_get      = mdd_attr_get,
657         .moo_attr_set      = mdd_attr_set,
658         .moo_xattr_get     = dot_lustre_mdd_xattr_get,
659         .moo_xattr_list    = dot_lustre_mdd_xattr_list,
660         .moo_xattr_set     = dot_lustre_mdd_xattr_set,
661         .moo_xattr_del     = dot_lustre_mdd_xattr_del,
662         .moo_readpage      = mdd_readpage,
663         .moo_readlink      = dot_lustre_mdd_readlink,
664         .moo_object_create = dot_lustre_mdd_object_create,
665         .moo_ref_add       = dot_lustre_mdd_ref_add,
666         .moo_ref_del       = dot_lustre_mdd_ref_del,
667         .moo_open          = dot_lustre_mdd_open,
668         .moo_close         = dot_lustre_mdd_close,
669         .moo_capa_get      = mdd_capa_get,
670         .moo_object_sync   = dot_lustre_mdd_object_sync,
671         .moo_path          = dot_lustre_mdd_path,
672         .moo_file_lock     = dot_file_lock,
673         .moo_file_unlock   = dot_file_unlock,
674 };
675
676
677 static int dot_lustre_mdd_lookup(const struct lu_env *env, struct md_object *p,
678                                  const struct lu_name *lname, struct lu_fid *f,
679                                  struct md_op_spec *spec)
680 {
681         if (strcmp(lname->ln_name, mdd_obf_dir_name) == 0)
682                 *f = LU_OBF_FID;
683         else
684                 return -ENOENT;
685
686         return 0;
687 }
688
689 static mdl_mode_t dot_lustre_mdd_lock_mode(const struct lu_env *env,
690                                            struct md_object *obj,
691                                            mdl_mode_t mode)
692 {
693         return MDL_MINMODE;
694 }
695
696 static int dot_lustre_mdd_create(const struct lu_env *env,
697                                  struct md_object *pobj,
698                                  const struct lu_name *lname,
699                                  struct md_object *child,
700                                  struct md_op_spec *spec,
701                                  struct md_attr* ma)
702 {
703         return -EPERM;
704 }
705
706 static int dot_lustre_mdd_create_data(const struct lu_env *env,
707                                       struct md_object *p,
708                                       struct md_object *o,
709                                       const struct md_op_spec *spec,
710                                       struct md_attr *ma)
711 {
712         return -EPERM;
713 }
714
715 static int dot_lustre_mdd_rename(const struct lu_env *env,
716                                  struct md_object *src_pobj,
717                                  struct md_object *tgt_pobj,
718                                  const struct lu_fid *lf,
719                                  const struct lu_name *lsname,
720                                  struct md_object *tobj,
721                                  const struct lu_name *ltname,
722                                  struct md_attr *ma)
723 {
724         return -EPERM;
725 }
726
727 static int dot_lustre_mdd_link(const struct lu_env *env,
728                                struct md_object *tgt_obj,
729                                struct md_object *src_obj,
730                                const struct lu_name *lname,
731                                struct md_attr *ma)
732 {
733         return -EPERM;
734 }
735
736 static int dot_lustre_mdd_unlink(const struct lu_env *env,
737                                  struct md_object *pobj,
738                                  struct md_object *cobj,
739                                  const struct lu_name *lname,
740                                  struct md_attr *ma)
741 {
742         return -EPERM;
743 }
744
745 static int dot_lustre_mdd_name_insert(const struct lu_env *env,
746                                       struct md_object *obj,
747                                       const struct lu_name *lname,
748                                       const struct lu_fid *fid,
749                                       const struct md_attr *ma)
750 {
751         return -EPERM;
752 }
753
754 static int dot_lustre_mdd_name_remove(const struct lu_env *env,
755                                       struct md_object *obj,
756                                       const struct lu_name *lname,
757                                       const struct md_attr *ma)
758 {
759         return -EPERM;
760 }
761
762 static int dot_lustre_mdd_rename_tgt(const struct lu_env *env,
763                                      struct md_object *pobj,
764                                      struct md_object *tobj,
765                                      const struct lu_fid *fid,
766                                      const struct lu_name *lname,
767                                      struct md_attr *ma)
768 {
769         return -EPERM;
770 }
771
772
773 static struct md_dir_operations mdd_dot_lustre_dir_ops = {
774         .mdo_is_subdir   = mdd_is_subdir,
775         .mdo_lookup      = dot_lustre_mdd_lookup,
776         .mdo_lock_mode   = dot_lustre_mdd_lock_mode,
777         .mdo_create      = dot_lustre_mdd_create,
778         .mdo_create_data = dot_lustre_mdd_create_data,
779         .mdo_rename      = dot_lustre_mdd_rename,
780         .mdo_link        = dot_lustre_mdd_link,
781         .mdo_unlink      = dot_lustre_mdd_unlink,
782         .mdo_name_insert = dot_lustre_mdd_name_insert,
783         .mdo_name_remove = dot_lustre_mdd_name_remove,
784         .mdo_rename_tgt  = dot_lustre_mdd_rename_tgt,
785 };
786
787 static int obf_attr_get(const struct lu_env *env, struct md_object *obj,
788                         struct md_attr *ma)
789 {
790         struct mdd_device *mdd = mdo2mdd(obj);
791
792         /* "fid" is a virtual object and hence does not have any "real"
793          * attributes. So we reuse attributes of .lustre for "fid" dir */
794         return mdd_attr_get(env, &mdd->mdd_dot_lustre->mod_obj, ma);
795 }
796
797 static int obf_attr_set(const struct lu_env *env, struct md_object *obj,
798                         const struct md_attr *ma)
799 {
800         return -EPERM;
801 }
802
803 static int obf_xattr_list(const struct lu_env *env,
804                           struct md_object *obj, struct lu_buf *buf)
805 {
806         return 0;
807 }
808
809 static int obf_xattr_get(const struct lu_env *env,
810                          struct md_object *obj, struct lu_buf *buf,
811                          const char *name)
812 {
813         struct mdd_device *mdd = mdo2mdd(obj);
814         struct mdd_object *root;
815         struct lu_fid      rootfid;
816         int rc = 0;
817
818         /*
819          * .lustre returns default striping which is 'stored'
820          * in the root
821          */
822         if (strcmp(name, XATTR_NAME_LOV) == 0) {
823                 rc = dt_root_get(env, mdd->mdd_child, &rootfid);
824                 if (rc)
825                         return rc;
826                 root = mdd_object_find(env, mdd, &rootfid);
827                 if (IS_ERR(root))
828                         return PTR_ERR(root);
829                 rc = mdo_xattr_get(env, root, buf, name,
830                                    mdd_object_capa(env, md2mdd_obj(obj)));
831                 mdd_object_put(env, root);
832         }
833
834         return rc;
835 }
836
837 static int obf_xattr_set(const struct lu_env *env,
838                          struct md_object *obj,
839                          const struct lu_buf *buf, const char *name,
840                          int fl)
841 {
842         return -EPERM;
843 }
844
845 static int obf_xattr_del(const struct lu_env *env,
846                          struct md_object *obj,
847                          const char *name)
848 {
849         return -EPERM;
850 }
851
852 static int obf_mdd_open(const struct lu_env *env, struct md_object *obj,
853                         int flags)
854 {
855         struct mdd_object *mdd_obj = md2mdd_obj(obj);
856
857         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
858         mdd_obj->mod_count++;
859         mdd_write_unlock(env, mdd_obj);
860
861         return 0;
862 }
863
864 static int obf_mdd_close(const struct lu_env *env, struct md_object *obj,
865                          struct md_attr *ma, int mode)
866 {
867         struct mdd_object *mdd_obj = md2mdd_obj(obj);
868
869         mdd_write_lock(env, mdd_obj, MOR_TGT_CHILD);
870         mdd_obj->mod_count--;
871         mdd_write_unlock(env, mdd_obj);
872
873         return 0;
874 }
875
876 /** Nothing to list in "fid" directory */
877 static int obf_mdd_readpage(const struct lu_env *env, struct md_object *obj,
878                             const struct lu_rdpg *rdpg)
879 {
880         return -EPERM;
881 }
882
883 static int obf_path(const struct lu_env *env, struct md_object *obj,
884                     char *path, int pathlen, __u64 *recno, int *linkno)
885 {
886         return -ENOSYS;
887 }
888
889 static struct md_object_operations mdd_obf_obj_ops = {
890         .moo_attr_get    = obf_attr_get,
891         .moo_attr_set    = obf_attr_set,
892         .moo_xattr_list  = obf_xattr_list,
893         .moo_xattr_get   = obf_xattr_get,
894         .moo_xattr_set   = obf_xattr_set,
895         .moo_xattr_del   = obf_xattr_del,
896         .moo_open        = obf_mdd_open,
897         .moo_close       = obf_mdd_close,
898         .moo_readpage    = obf_mdd_readpage,
899         .moo_path        = obf_path
900 };
901
902 /**
903  * Lookup method for "fid" object. Only filenames with correct SEQ:OID format
904  * are valid. We also check if object with passed fid exists or not.
905  */
906 static int obf_lookup(const struct lu_env *env, struct md_object *p,
907                       const struct lu_name *lname, struct lu_fid *f,
908                       struct md_op_spec *spec)
909 {
910         char *name = (char *)lname->ln_name;
911         struct mdd_device *mdd = mdo2mdd(p);
912         struct mdd_object *child;
913         int rc = 0;
914
915         while (*name == '[')
916                 name++;
917
918         sscanf(name, SFID, RFID(f));
919         if (!fid_is_sane(f)) {
920                 CWARN("%s: bad FID format [%s], should be "DFID"\n",
921                       mdd2obd_dev(mdd)->obd_name, lname->ln_name,
922                       (__u64)FID_SEQ_NORMAL, 1, 0);
923                 GOTO(out, rc = -EINVAL);
924         }
925
926         if (!fid_is_norm(f)) {
927                 CWARN("%s: "DFID" is invalid, sequence should be "
928                       ">= "LPX64"\n", mdd2obd_dev(mdd)->obd_name, PFID(f),
929                       (__u64)FID_SEQ_NORMAL);
930                 GOTO(out, rc = -EINVAL);
931         }
932
933         /* Check if object with this fid exists */
934         child = mdd_object_find(env, mdd, f);
935         if (child == NULL)
936                 GOTO(out, rc = 0);
937         if (IS_ERR(child))
938                 GOTO(out, rc = PTR_ERR(child));
939
940         if (mdd_object_exists(child) == 0)
941                 rc = -ENOENT;
942
943         mdd_object_put(env, child);
944
945 out:
946         return rc;
947 }
948
949 static int obf_create(const struct lu_env *env, struct md_object *pobj,
950                       const struct lu_name *lname, struct md_object *child,
951                       struct md_op_spec *spec, struct md_attr* ma)
952 {
953         return -EPERM;
954 }
955
956 static int obf_rename(const struct lu_env *env,
957                       struct md_object *src_pobj, struct md_object *tgt_pobj,
958                       const struct lu_fid *lf, const struct lu_name *lsname,
959                       struct md_object *tobj, const struct lu_name *ltname,
960                       struct md_attr *ma)
961 {
962         return -EPERM;
963 }
964
965 static int obf_link(const struct lu_env *env, struct md_object *tgt_obj,
966                     struct md_object *src_obj, const struct lu_name *lname,
967                     struct md_attr *ma)
968 {
969         return -EPERM;
970 }
971
972 static int obf_unlink(const struct lu_env *env, struct md_object *pobj,
973                       struct md_object *cobj, const struct lu_name *lname,
974                       struct md_attr *ma)
975 {
976         return -EPERM;
977 }
978
979 static struct md_dir_operations mdd_obf_dir_ops = {
980         .mdo_lookup = obf_lookup,
981         .mdo_create = obf_create,
982         .mdo_rename = obf_rename,
983         .mdo_link   = obf_link,
984         .mdo_unlink = obf_unlink
985 };
986
987 /**
988  * Create special in-memory "fid" object for open-by-fid.
989  */
990 static int mdd_obf_setup(const struct lu_env *env, struct mdd_device *m)
991 {
992         struct mdd_object *mdd_obf;
993         struct lu_object *obf_lu_obj;
994         int rc = 0;
995
996         m->mdd_dot_lustre_objs.mdd_obf = mdd_object_find(env, m,
997                                                          &LU_OBF_FID);
998         if (m->mdd_dot_lustre_objs.mdd_obf == NULL ||
999             IS_ERR(m->mdd_dot_lustre_objs.mdd_obf))
1000                 GOTO(out, rc = -ENOENT);
1001
1002         mdd_obf = m->mdd_dot_lustre_objs.mdd_obf;
1003         mdd_obf->mod_obj.mo_dir_ops = &mdd_obf_dir_ops;
1004         mdd_obf->mod_obj.mo_ops = &mdd_obf_obj_ops;
1005         /* Don't allow objects to be created in "fid" dir */
1006         mdd_obf->mod_flags |= IMMUTE_OBJ;
1007
1008         obf_lu_obj = mdd2lu_obj(mdd_obf);
1009         obf_lu_obj->lo_header->loh_attr |= (LOHA_EXISTS | S_IFDIR);
1010
1011 out:
1012         return rc;
1013 }
1014
1015 /** Setup ".lustre" directory object */
1016 static int mdd_dot_lustre_setup(const struct lu_env *env, struct mdd_device *m)
1017 {
1018         struct dt_object *dt_dot_lustre;
1019         struct lu_fid *fid = &mdd_env_info(env)->mti_fid;
1020         int rc;
1021
1022         rc = create_dot_lustre_dir(env, m);
1023         if (rc)
1024                 return rc;
1025
1026         dt_dot_lustre = dt_store_open(env, m->mdd_child, mdd_root_dir_name,
1027                                       dot_lustre_name, fid);
1028         if (IS_ERR(dt_dot_lustre)) {
1029                 rc = PTR_ERR(dt_dot_lustre);
1030                 GOTO(out, rc);
1031         }
1032
1033         /* references are released in mdd_device_shutdown() */
1034         m->mdd_dot_lustre = lu2mdd_obj(lu_object_locate(dt_dot_lustre->do_lu.lo_header,
1035                                                         &mdd_device_type));
1036
1037         m->mdd_dot_lustre->mod_obj.mo_dir_ops = &mdd_dot_lustre_dir_ops;
1038         m->mdd_dot_lustre->mod_obj.mo_ops = &mdd_dot_lustre_obj_ops;
1039
1040         rc = mdd_obf_setup(env, m);
1041         if (rc)
1042                 CERROR("Error initializing \"fid\" object - %d.\n", rc);
1043
1044 out:
1045         RETURN(rc);
1046 }
1047
1048 static int mdd_process_config(const struct lu_env *env,
1049                               struct lu_device *d, struct lustre_cfg *cfg)
1050 {
1051         struct mdd_device *m    = lu2mdd_dev(d);
1052         struct dt_device  *dt   = m->mdd_child;
1053         struct lu_device  *next = &dt->dd_lu_dev;
1054         int rc;
1055         ENTRY;
1056
1057         switch (cfg->lcfg_command) {
1058         case LCFG_PARAM: {
1059                 struct lprocfs_static_vars lvars;
1060
1061                 lprocfs_mdd_init_vars(&lvars);
1062                 rc = class_process_proc_param(PARAM_MDD, lvars.obd_vars, cfg,m);
1063                 if (rc > 0 || rc == -ENOSYS)
1064                         /* we don't understand; pass it on */
1065                         rc = next->ld_ops->ldo_process_config(env, next, cfg);
1066                 break;
1067         }
1068         case LCFG_SETUP:
1069                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
1070                 if (rc)
1071                         GOTO(out, rc);
1072                 dt->dd_ops->dt_conf_get(env, dt, &m->mdd_dt_conf);
1073
1074                 mdd_changelog_init(env, m);
1075                 break;
1076         case LCFG_CLEANUP:
1077                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
1078                 lu_dev_del_linkage(d->ld_site, d);
1079                 mdd_device_shutdown(env, m, cfg);
1080                 break;
1081         default:
1082                 rc = next->ld_ops->ldo_process_config(env, next, cfg);
1083                 break;
1084         }
1085 out:
1086         RETURN(rc);
1087 }
1088
1089 static int mdd_recovery_complete(const struct lu_env *env,
1090                                  struct lu_device *d)
1091 {
1092         struct mdd_device *mdd = lu2mdd_dev(d);
1093         struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
1094         int rc;
1095         ENTRY;
1096
1097         LASSERT(mdd != NULL);
1098
1099         /* XXX: orphans handling. */
1100         __mdd_orphan_cleanup(env, mdd);
1101         rc = next->ld_ops->ldo_recovery_complete(env, next);
1102
1103         RETURN(rc);
1104 }
1105
1106 static int mdd_prepare(const struct lu_env *env,
1107                        struct lu_device *pdev,
1108                        struct lu_device *cdev)
1109 {
1110         struct mdd_device *mdd = lu2mdd_dev(cdev);
1111         struct lu_device *next = &mdd->mdd_child->dd_lu_dev;
1112         struct dt_object *root;
1113         struct lu_fid     fid;
1114         int rc;
1115
1116         ENTRY;
1117         rc = next->ld_ops->ldo_prepare(env, cdev, next);
1118         if (rc)
1119                 GOTO(out, rc);
1120
1121         root = dt_store_open(env, mdd->mdd_child, "", mdd_root_dir_name,
1122                              &mdd->mdd_root_fid);
1123         if (!IS_ERR(root)) {
1124                 LASSERT(root != NULL);
1125                 lu_object_put(env, &root->do_lu);
1126                 rc = orph_index_init(env, mdd);
1127         } else {
1128                 rc = PTR_ERR(root);
1129         }
1130         if (rc)
1131                 GOTO(out, rc);
1132
1133         rc = mdd_dot_lustre_setup(env, mdd);
1134         if (rc) {
1135                 CERROR("Error(%d) initializing .lustre objects\n", rc);
1136                 GOTO(out, rc);
1137         }
1138
1139         /* we use capa file to declare llog changes,
1140          * will be fixed with new llog in 2.3 */
1141         root = dt_store_open(env, mdd->mdd_child, "", CAPA_KEYS, &fid);
1142         if (IS_ERR(root))
1143                 GOTO(out, rc = PTR_ERR(root));
1144
1145         mdd->mdd_capa = root;
1146         rc = mdd_lfsck_setup(env, mdd);
1147
1148         GOTO(out, rc);
1149
1150 out:
1151         return rc;
1152 }
1153
1154 const struct lu_device_operations mdd_lu_ops = {
1155         .ldo_object_alloc      = mdd_object_alloc,
1156         .ldo_process_config    = mdd_process_config,
1157         .ldo_recovery_complete = mdd_recovery_complete,
1158         .ldo_prepare           = mdd_prepare,
1159 };
1160
1161 /*
1162  * No permission check is needed.
1163  */
1164 static int mdd_root_get(const struct lu_env *env,
1165                         struct md_device *m, struct lu_fid *f)
1166 {
1167         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1168
1169         ENTRY;
1170         *f = mdd->mdd_root_fid;
1171         RETURN(0);
1172 }
1173
1174 /*
1175  * No permission check is needed.
1176  */
1177 static int mdd_statfs(const struct lu_env *env, struct md_device *m,
1178                       struct obd_statfs *sfs)
1179 {
1180         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1181         int rc;
1182
1183         ENTRY;
1184
1185         rc = mdd_child_ops(mdd)->dt_statfs(env, mdd->mdd_child, sfs);
1186
1187         RETURN(rc);
1188 }
1189
1190 /*
1191  * No permission check is needed.
1192  */
1193 static int mdd_maxsize_get(const struct lu_env *env, struct md_device *m,
1194                            int *md_size, int *cookie_size)
1195 {
1196         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1197         ENTRY;
1198
1199         *md_size = mdd_lov_mdsize(env, mdd);
1200         *cookie_size = mdd_lov_cookiesize(env, mdd);
1201
1202         RETURN(0);
1203 }
1204
1205 static int mdd_init_capa_ctxt(const struct lu_env *env, struct md_device *m,
1206                               int mode, unsigned long timeout, __u32 alg,
1207                               struct lustre_capa_key *keys)
1208 {
1209         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1210         struct mds_obd    *mds = &mdd2obd_dev(mdd)->u.mds;
1211         int rc;
1212         ENTRY;
1213
1214         /* need barrier for mds_capa_keys access. */
1215         cfs_down_write(&mds->mds_notify_lock);
1216         mds->mds_capa_keys = keys;
1217         cfs_up_write(&mds->mds_notify_lock);
1218
1219         rc = mdd_child_ops(mdd)->dt_init_capa_ctxt(env, mdd->mdd_child, mode,
1220                                                    timeout, alg, keys);
1221         RETURN(rc);
1222 }
1223
1224 static int mdd_update_capa_key(const struct lu_env *env,
1225                                struct md_device *m,
1226                                struct lustre_capa_key *key)
1227 {
1228         /* we do not support capabilities ... */
1229         return -EINVAL;
1230 }
1231
1232 static int mdd_llog_ctxt_get(const struct lu_env *env, struct md_device *m,
1233                              int idx, void **h)
1234 {
1235         struct mdd_device *mdd = lu2mdd_dev(&m->md_lu_dev);
1236
1237         *h = llog_group_get_ctxt(&mdd2obd_dev(mdd)->obd_olg, idx);
1238         return (*h == NULL ? -ENOENT : 0);
1239 }
1240
1241 static struct lu_device *mdd_device_free(const struct lu_env *env,
1242                                          struct lu_device *lu)
1243 {
1244         struct mdd_device *m = lu2mdd_dev(lu);
1245         ENTRY;
1246
1247         LASSERT(cfs_atomic_read(&lu->ld_ref) == 0);
1248         md_device_fini(&m->mdd_md_dev);
1249         OBD_FREE_PTR(m);
1250         RETURN(NULL);
1251 }
1252
1253 static struct lu_device *mdd_device_alloc(const struct lu_env *env,
1254                                           struct lu_device_type *t,
1255                                           struct lustre_cfg *lcfg)
1256 {
1257         struct lu_device  *l;
1258         struct mdd_device *m;
1259
1260         OBD_ALLOC_PTR(m);
1261         if (m == NULL) {
1262                 l = ERR_PTR(-ENOMEM);
1263         } else {
1264                 int rc;
1265
1266                 l = mdd2lu_dev(m);
1267                 md_device_init(&m->mdd_md_dev, t);
1268                 rc = mdd_init0(env, m, t, lcfg);
1269                 if (rc != 0) {
1270                         mdd_device_free(env, l);
1271                         l = ERR_PTR(rc);
1272                 }
1273         }
1274
1275         return l;
1276 }
1277
1278 /*
1279  * we use exports to track all mdd users
1280  */
1281 static int mdd_obd_connect(const struct lu_env *env, struct obd_export **exp,
1282                            struct obd_device *obd, struct obd_uuid *cluuid,
1283                            struct obd_connect_data *data, void *localdata)
1284 {
1285         struct mdd_device    *mdd = lu2mdd_dev(obd->obd_lu_dev);
1286         struct lustre_handle  conn;
1287         int                   rc;
1288         ENTRY;
1289
1290         CDEBUG(D_CONFIG, "connect #%d\n", mdd->mdd_connects);
1291
1292         rc = class_connect(&conn, obd, cluuid);
1293         if (rc)
1294                 RETURN(rc);
1295
1296         *exp = class_conn2export(&conn);
1297
1298         /* Why should there ever be more than 1 connect? */
1299         LASSERT(mdd->mdd_connects == 0);
1300         mdd->mdd_connects++;
1301
1302         RETURN(0);
1303 }
1304
1305 /*
1306  * once last export (we don't count self-export) disappeared
1307  * mdd can be released
1308  */
1309 static int mdd_obd_disconnect(struct obd_export *exp)
1310 {
1311         struct obd_device *obd = exp->exp_obd;
1312         struct mdd_device *mdd = lu2mdd_dev(obd->obd_lu_dev);
1313         int                rc, release = 0;
1314         ENTRY;
1315
1316         mdd->mdd_connects--;
1317         if (mdd->mdd_connects == 0)
1318                 release = 1;
1319
1320         rc = class_disconnect(exp);
1321
1322         if (rc == 0 && release)
1323                 class_manual_cleanup(obd);
1324         RETURN(rc);
1325 }
1326
1327 static int mdd_obd_health_check(const struct lu_env *env,
1328                                 struct obd_device *obd)
1329 {
1330         struct mdd_device *mdd = lu2mdd_dev(obd->obd_lu_dev);
1331         int                rc;
1332         ENTRY;
1333
1334         LASSERT(mdd);
1335         rc = obd_health_check(env, mdd->mdd_child_exp->exp_obd);
1336         RETURN(rc);
1337 }
1338
1339 static struct obd_ops mdd_obd_device_ops = {
1340         .o_owner        = THIS_MODULE,
1341         .o_connect      = mdd_obd_connect,
1342         .o_disconnect   = mdd_obd_disconnect,
1343         .o_health_check = mdd_obd_health_check
1344 };
1345
1346 /* context key constructor/destructor: mdd_ucred_key_init, mdd_ucred_key_fini */
1347 LU_KEY_INIT_FINI(mdd_ucred, struct md_ucred);
1348
1349 static struct lu_context_key mdd_ucred_key = {
1350         .lct_tags = LCT_SESSION,
1351         .lct_init = mdd_ucred_key_init,
1352         .lct_fini = mdd_ucred_key_fini
1353 };
1354
1355 struct md_ucred *md_ucred(const struct lu_env *env)
1356 {
1357         LASSERT(env->le_ses != NULL);
1358         return lu_context_key_get(env->le_ses, &mdd_ucred_key);
1359 }
1360 EXPORT_SYMBOL(md_ucred);
1361
1362 /*
1363  * context key constructor/destructor:
1364  * mdd_capainfo_key_init, mdd_capainfo_key_fini
1365  */
1366 LU_KEY_INIT_FINI(mdd_capainfo, struct md_capainfo);
1367
1368 struct lu_context_key mdd_capainfo_key = {
1369         .lct_tags = LCT_SESSION,
1370         .lct_init = mdd_capainfo_key_init,
1371         .lct_fini = mdd_capainfo_key_fini
1372 };
1373
1374 struct md_capainfo *md_capainfo(const struct lu_env *env)
1375 {
1376         /* NB, in mdt_init0 */
1377         if (env->le_ses == NULL)
1378                 return NULL;
1379         return lu_context_key_get(env->le_ses, &mdd_capainfo_key);
1380 }
1381 EXPORT_SYMBOL(md_capainfo);
1382
1383 static int mdd_changelog_user_register(struct mdd_device *mdd, int *id)
1384 {
1385         struct llog_ctxt *ctxt;
1386         struct llog_changelog_user_rec *rec;
1387         int rc;
1388         ENTRY;
1389
1390         ctxt = llog_get_context(mdd2obd_dev(mdd),LLOG_CHANGELOG_USER_ORIG_CTXT);
1391         if (ctxt == NULL)
1392                 RETURN(-ENXIO);
1393
1394         OBD_ALLOC_PTR(rec);
1395         if (rec == NULL) {
1396                 llog_ctxt_put(ctxt);
1397                 RETURN(-ENOMEM);
1398         }
1399
1400         /* Assume we want it on since somebody registered */
1401         rc = mdd_changelog_on(mdd, 1);
1402         if (rc)
1403                 GOTO(out, rc);
1404
1405         rec->cur_hdr.lrh_len = sizeof(*rec);
1406         rec->cur_hdr.lrh_type = CHANGELOG_USER_REC;
1407         cfs_spin_lock(&mdd->mdd_cl.mc_user_lock);
1408         if (mdd->mdd_cl.mc_lastuser == (unsigned int)(-1)) {
1409                 cfs_spin_unlock(&mdd->mdd_cl.mc_user_lock);
1410                 CERROR("Maximum number of changelog users exceeded!\n");
1411                 GOTO(out, rc = -EOVERFLOW);
1412         }
1413         *id = rec->cur_id = ++mdd->mdd_cl.mc_lastuser;
1414         rec->cur_endrec = mdd->mdd_cl.mc_index;
1415         cfs_spin_unlock(&mdd->mdd_cl.mc_user_lock);
1416
1417         rc = llog_obd_add(NULL, ctxt, &rec->cur_hdr, NULL, NULL, 0);
1418
1419         CDEBUG(D_IOCTL, "Registered changelog user %d\n", *id);
1420 out:
1421         OBD_FREE_PTR(rec);
1422         llog_ctxt_put(ctxt);
1423         RETURN(rc);
1424 }
1425
1426 int mdd_declare_llog_cancel(const struct lu_env *env, struct mdd_device *mdd,
1427                             struct thandle *handle)
1428 {
1429         int rc;
1430
1431
1432         /* XXX: this is a temporary solution to declare llog changes
1433          *      will be fixed in 2.3 with new llog implementation */
1434
1435         LASSERT(mdd->mdd_capa);
1436
1437         /* the llog record could be canceled either by modifying
1438          * the plain llog's header or by destroying the llog itself
1439          * when this record is the last one in it, it can't be known
1440          * here, but the catlog's header will also be modified for
1441          * the second case, then the first case can be covered and
1442          * is no need to declare it */
1443
1444         /* destroy empty plain log */
1445         rc = dt_declare_destroy(env, mdd->mdd_capa, handle);
1446         if (rc)
1447                 return rc;
1448
1449         /* record the catlog's header if an empty plain log was destroyed */
1450         rc = dt_declare_record_write(env, mdd->mdd_capa,
1451                                      sizeof(struct llog_logid_rec), 0, handle);
1452         return rc;
1453 }
1454
1455 struct mdd_changelog_user_data {
1456         __u64 mcud_endrec; /**< purge record for this user */
1457         __u64 mcud_minrec; /**< lowest changelog recno still referenced */
1458         __u32 mcud_id;
1459         __u32 mcud_minid;  /**< user id with lowest rec reference */
1460         __u32 mcud_usercount;
1461         int   mcud_found:1;
1462         struct mdd_device   *mcud_mdd;
1463 };
1464 #define MCUD_UNREGISTER -1LL
1465
1466 /** Two things:
1467  * 1. Find the smallest record everyone is willing to purge
1468  * 2. Update the last purgeable record for this user
1469  */
1470 static int mdd_changelog_user_purge_cb(const struct lu_env *env,
1471                                        struct llog_handle *llh,
1472                                        struct llog_rec_hdr *hdr, void *data)
1473 {
1474         struct llog_changelog_user_rec  *rec;
1475         struct mdd_changelog_user_data  *mcud = data;
1476         int                              rc;
1477
1478         ENTRY;
1479
1480         LASSERT(llh->lgh_hdr->llh_flags & LLOG_F_IS_PLAIN);
1481
1482         rec = (struct llog_changelog_user_rec *)hdr;
1483
1484         mcud->mcud_usercount++;
1485
1486         /* If we have a new endrec for this id, use it for the following
1487            min check instead of its old value */
1488         if (rec->cur_id == mcud->mcud_id)
1489                 rec->cur_endrec = max(rec->cur_endrec, mcud->mcud_endrec);
1490
1491         /* Track the minimum referenced record */
1492         if (mcud->mcud_minid == 0 || mcud->mcud_minrec > rec->cur_endrec) {
1493                 mcud->mcud_minid = rec->cur_id;
1494                 mcud->mcud_minrec = rec->cur_endrec;
1495         }
1496
1497         if (rec->cur_id != mcud->mcud_id)
1498                 RETURN(0);
1499
1500         /* Update this user's record */
1501         mcud->mcud_found = 1;
1502
1503         /* Special case: unregister this user */
1504         if (mcud->mcud_endrec == MCUD_UNREGISTER) {
1505                 struct llog_cookie cookie;
1506                 void *th;
1507                 struct mdd_device *mdd = mcud->mcud_mdd;
1508
1509                 cookie.lgc_lgl = llh->lgh_id;
1510                 cookie.lgc_index = hdr->lrh_index;
1511
1512                 /* XXX This is a workaround for the deadlock of changelog
1513                  * adding vs. changelog cancelling. LU-81. */
1514                 th = mdd_trans_create(env, mdd);
1515                 if (IS_ERR(th)) {
1516                         CERROR("Cannot get thandle\n");
1517                         RETURN(-ENOMEM);
1518                 }
1519
1520                 rc = mdd_declare_llog_cancel(env, mdd, th);
1521                 if (rc)
1522                         GOTO(stop, rc);
1523
1524                 rc = mdd_trans_start(env, mdd, th);
1525                 if (rc)
1526                         GOTO(stop, rc);
1527
1528                 rc = llog_cat_cancel_records(env, llh->u.phd.phd_cat_handle,
1529                                              1, &cookie);
1530                 if (rc == 0)
1531                         mcud->mcud_usercount--;
1532
1533 stop:
1534                 mdd_trans_stop(env, mdd, 0, th);
1535                 RETURN(rc);
1536         }
1537
1538         /* Update the endrec */
1539         CDEBUG(D_IOCTL, "Rewriting changelog user %d endrec to "LPU64"\n",
1540                mcud->mcud_id, rec->cur_endrec);
1541
1542         /* hdr+1 is loc of data */
1543         hdr->lrh_len -= sizeof(*hdr) + sizeof(struct llog_rec_tail);
1544         rc = llog_write_rec(env, llh, hdr, NULL, 0, (void *)(hdr + 1),
1545                             hdr->lrh_index, NULL);
1546
1547         RETURN(rc);
1548 }
1549
1550 static int mdd_changelog_user_purge(const struct lu_env *env,
1551                                     struct mdd_device *mdd, int id,
1552                                     long long endrec)
1553 {
1554         struct mdd_changelog_user_data data;
1555         struct llog_ctxt *ctxt;
1556         int rc;
1557         ENTRY;
1558
1559         CDEBUG(D_IOCTL, "Purge request: id=%d, endrec=%lld\n", id, endrec);
1560
1561         data.mcud_id = id;
1562         data.mcud_minid = 0;
1563         data.mcud_minrec = 0;
1564         data.mcud_usercount = 0;
1565         data.mcud_endrec = endrec;
1566         data.mcud_mdd = mdd;
1567         cfs_spin_lock(&mdd->mdd_cl.mc_lock);
1568         endrec = mdd->mdd_cl.mc_index;
1569         cfs_spin_unlock(&mdd->mdd_cl.mc_lock);
1570         if ((data.mcud_endrec == 0) ||
1571             ((data.mcud_endrec > endrec) &&
1572              (data.mcud_endrec != MCUD_UNREGISTER)))
1573                 data.mcud_endrec = endrec;
1574
1575         ctxt = llog_get_context(mdd2obd_dev(mdd),LLOG_CHANGELOG_USER_ORIG_CTXT);
1576         if (ctxt == NULL)
1577                 return -ENXIO;
1578         LASSERT(ctxt->loc_handle->lgh_hdr->llh_flags & LLOG_F_IS_CAT);
1579
1580         rc = llog_cat_process(env, ctxt->loc_handle,
1581                               mdd_changelog_user_purge_cb, (void *)&data,
1582                               0, 0);
1583         if ((rc >= 0) && (data.mcud_minrec > 0)) {
1584                 CDEBUG(D_IOCTL, "Purging changelog entries up to "LPD64
1585                        ", referenced by "CHANGELOG_USER_PREFIX"%d\n",
1586                        data.mcud_minrec, data.mcud_minid);
1587                 rc = mdd_changelog_llog_cancel(env, mdd, data.mcud_minrec);
1588         } else {
1589                 CWARN("Could not determine changelog records to purge; rc=%d\n",
1590                       rc);
1591         }
1592
1593         llog_ctxt_put(ctxt);
1594
1595         if (!data.mcud_found) {
1596                 CWARN("No entry for user %d.  Last changelog reference is "
1597                       LPD64" by changelog user %d\n", data.mcud_id,
1598                       data.mcud_minrec, data.mcud_minid);
1599                rc = -ENOENT;
1600         }
1601
1602         if (!rc && data.mcud_usercount == 0)
1603                 /* No more users; turn changelogs off */
1604                 rc = mdd_changelog_on(mdd, 0);
1605
1606         RETURN (rc);
1607 }
1608
1609 /** mdd_iocontrol
1610  * May be called remotely from mdt_iocontrol_handle or locally from
1611  * mdt_iocontrol. Data may be freeform - remote handling doesn't enforce
1612  * an obd_ioctl_data format (but local ioctl handler does).
1613  * \param cmd - ioc
1614  * \param len - data len
1615  * \param karg - ioctl data, in kernel space
1616  */
1617 static int mdd_iocontrol(const struct lu_env *env, struct md_device *m,
1618                          unsigned int cmd, int len, void *karg)
1619 {
1620         struct mdd_device *mdd;
1621         struct obd_ioctl_data *data = karg;
1622         int rc;
1623         ENTRY;
1624
1625         mdd = lu2mdd_dev(&m->md_lu_dev);
1626
1627         /* Doesn't use obd_ioctl_data */
1628         switch (cmd) {
1629         case OBD_IOC_CHANGELOG_CLEAR: {
1630                 struct changelog_setinfo *cs = karg;
1631                 rc = mdd_changelog_user_purge(env, mdd, cs->cs_id,
1632                                               cs->cs_recno);
1633                 RETURN(rc);
1634         }
1635         case OBD_IOC_GET_MNTOPT: {
1636                 mntopt_t *mntopts = (mntopt_t *)karg;
1637                 *mntopts = mdd->mdd_dt_conf.ddp_mntopts;
1638                 RETURN(0);
1639         }
1640         case OBD_IOC_START_LFSCK: {
1641                 struct lfsck_start *start = karg;
1642                 struct md_lfsck *lfsck = &mdd->mdd_lfsck;
1643
1644                 /* Return the kernel service version. */
1645                 /* XXX: version can be used for compatibility in the future. */
1646                 start->ls_version = lfsck->ml_version;
1647                 rc = mdd_lfsck_start(env, lfsck, start);
1648                 RETURN(rc);
1649         }
1650         case OBD_IOC_STOP_LFSCK: {
1651                 rc = mdd_lfsck_stop(env, &mdd->mdd_lfsck);
1652                 RETURN(rc);
1653         }
1654         }
1655
1656         /* Below ioctls use obd_ioctl_data */
1657         if (len != sizeof(*data)) {
1658                 CERROR("Bad ioctl size %d\n", len);
1659                 RETURN(-EINVAL);
1660         }
1661         if (data->ioc_version != OBD_IOCTL_VERSION) {
1662                 CERROR("Bad magic %x != %x\n", data->ioc_version,
1663                        OBD_IOCTL_VERSION);
1664                 RETURN(-EINVAL);
1665         }
1666
1667         switch (cmd) {
1668         case OBD_IOC_CHANGELOG_REG:
1669                 rc = mdd_changelog_user_register(mdd, &data->ioc_u32_1);
1670                 break;
1671         case OBD_IOC_CHANGELOG_DEREG:
1672                 rc = mdd_changelog_user_purge(env, mdd, data->ioc_u32_1,
1673                                               MCUD_UNREGISTER);
1674                 break;
1675         default:
1676                 rc = -ENOTTY;
1677         }
1678
1679         RETURN (rc);
1680 }
1681
1682 /* type constructor/destructor: mdd_type_init, mdd_type_fini */
1683 LU_TYPE_INIT_FINI(mdd, &mdd_thread_key, &mdd_ucred_key, &mdd_capainfo_key);
1684
1685 const struct md_device_operations mdd_ops = {
1686         .mdo_statfs         = mdd_statfs,
1687         .mdo_root_get       = mdd_root_get,
1688         .mdo_maxsize_get    = mdd_maxsize_get,
1689         .mdo_init_capa_ctxt = mdd_init_capa_ctxt,
1690         .mdo_update_capa_key= mdd_update_capa_key,
1691         .mdo_llog_ctxt_get  = mdd_llog_ctxt_get,
1692         .mdo_iocontrol      = mdd_iocontrol,
1693 };
1694
1695 static struct lu_device_type_operations mdd_device_type_ops = {
1696         .ldto_init = mdd_type_init,
1697         .ldto_fini = mdd_type_fini,
1698
1699         .ldto_start = mdd_type_start,
1700         .ldto_stop  = mdd_type_stop,
1701
1702         .ldto_device_alloc = mdd_device_alloc,
1703         .ldto_device_free  = mdd_device_free,
1704
1705         .ldto_device_fini    = mdd_device_fini
1706 };
1707
1708 static struct lu_device_type mdd_device_type = {
1709         .ldt_tags     = LU_DEVICE_MD,
1710         .ldt_name     = LUSTRE_MDD_NAME,
1711         .ldt_ops      = &mdd_device_type_ops,
1712         .ldt_ctx_tags = LCT_MD_THREAD
1713 };
1714
1715 /* context key constructor: mdd_key_init */
1716 LU_KEY_INIT(mdd, struct mdd_thread_info);
1717
1718 static void mdd_key_fini(const struct lu_context *ctx,
1719                          struct lu_context_key *key, void *data)
1720 {
1721         struct mdd_thread_info *info = data;
1722         if (info->mti_max_lmm != NULL)
1723                 OBD_FREE(info->mti_max_lmm, info->mti_max_lmm_size);
1724         if (info->mti_max_cookie != NULL)
1725                 OBD_FREE(info->mti_max_cookie, info->mti_max_cookie_size);
1726         mdd_buf_put(&info->mti_big_buf);
1727
1728         OBD_FREE_PTR(info);
1729 }
1730
1731 /* context key: mdd_thread_key */
1732 LU_CONTEXT_KEY_DEFINE(mdd, LCT_MD_THREAD);
1733
1734 static struct lu_local_obj_desc llod_capa_key = {
1735         .llod_name      = CAPA_KEYS,
1736         .llod_oid       = MDD_CAPA_KEYS_OID,
1737         .llod_is_index  = 0,
1738 };
1739
1740 static struct lu_local_obj_desc llod_mdd_orphan = {
1741         .llod_name      = orph_index_name,
1742         .llod_oid       = MDD_ORPHAN_OID,
1743         .llod_is_index  = 1,
1744         .llod_feat      = &dt_directory_features,
1745 };
1746
1747 static struct lu_local_obj_desc llod_mdd_root = {
1748         .llod_name      = mdd_root_dir_name,
1749         .llod_oid       = MDD_ROOT_INDEX_OID,
1750         .llod_is_index  = 1,
1751         .llod_feat      = &dt_directory_features,
1752 };
1753
1754 static struct lu_local_obj_desc llod_lfsck_bookmark = {
1755         .llod_name      = lfsck_bookmark_name,
1756         .llod_oid       = LFSCK_BOOKMARK_OID,
1757         .llod_is_index  = 0,
1758 };
1759
1760 static int __init mdd_mod_init(void)
1761 {
1762         struct lprocfs_static_vars lvars;
1763         int rc;
1764
1765         lprocfs_mdd_init_vars(&lvars);
1766
1767         rc = lu_kmem_init(mdd_caches);
1768         if (rc)
1769                 return rc;
1770
1771         llo_local_obj_register(&llod_capa_key);
1772         llo_local_obj_register(&llod_mdd_orphan);
1773         llo_local_obj_register(&llod_mdd_root);
1774         llo_local_obj_register(&llod_lfsck_bookmark);
1775
1776         rc = class_register_type(&mdd_obd_device_ops, NULL, lvars.module_vars,
1777                                  LUSTRE_MDD_NAME, &mdd_device_type);
1778         if (rc)
1779                 lu_kmem_fini(mdd_caches);
1780         return rc;
1781 }
1782
1783 static void __exit mdd_mod_exit(void)
1784 {
1785         llo_local_obj_unregister(&llod_capa_key);
1786         llo_local_obj_unregister(&llod_mdd_orphan);
1787         llo_local_obj_unregister(&llod_mdd_root);
1788         llo_local_obj_unregister(&llod_lfsck_bookmark);
1789
1790         class_unregister_type(LUSTRE_MDD_NAME);
1791         lu_kmem_fini(mdd_caches);
1792 }
1793
1794 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
1795 MODULE_DESCRIPTION("Lustre Meta-data Device Prototype ("LUSTRE_MDD_NAME")");
1796 MODULE_LICENSE("GPL");
1797
1798 cfs_module(mdd, "0.1.0", mdd_mod_init, mdd_mod_exit);