Whamcloud - gitweb
LU-3030 build: Update Master Copyrights pre 2.4 split
[fs/lustre-release.git] / lustre / ofd / ofd_fs.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2012, 2013, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  *
36  * lustre/ofd/ofd_fs.c
37  *
38  * Author: Alexey Zhuravlev <bzzz@whamcloud.com>
39  * Author: Mikhail Pershin <tappro@whamcloud.com>
40  */
41
42 #define DEBUG_SUBSYSTEM S_FILTER
43
44 #include "ofd_internal.h"
45
46 int ofd_record_write(const struct lu_env *env, struct ofd_device *ofd,
47                      struct dt_object *dt, struct lu_buf *buf, loff_t *off)
48 {
49         struct thandle  *th;
50         int              rc;
51
52         ENTRY;
53
54         LASSERT(dt);
55
56         th = dt_trans_create(env, ofd->ofd_osd);
57         if (IS_ERR(th))
58                 RETURN(PTR_ERR(th));
59
60         rc = dt_declare_record_write(env, dt, buf->lb_len, *off, th);
61         if (rc == 0) {
62                 rc = dt_trans_start_local(env, ofd->ofd_osd, th);
63                 if (rc == 0)
64                         rc = dt_record_write(env, dt, buf, off, th);
65         }
66         dt_trans_stop(env, ofd->ofd_osd, th);
67
68         RETURN(rc);
69 }
70
71 int ofd_precreate_batch(struct ofd_device *ofd, int batch)
72 {
73         int count;
74
75         spin_lock(&ofd->ofd_batch_lock);
76         count = min(ofd->ofd_precreate_batch, batch);
77         spin_unlock(&ofd->ofd_batch_lock);
78
79         return count;
80 }
81
82 struct ofd_seq *ofd_seq_get(struct ofd_device *ofd, obd_seq seq)
83 {
84         struct ofd_seq *oseq;
85
86         read_lock(&ofd->ofd_seq_list_lock);
87         cfs_list_for_each_entry(oseq, &ofd->ofd_seq_list, os_list) {
88                 if (ostid_seq(&oseq->os_oi) == seq) {
89                         cfs_atomic_inc(&oseq->os_refc);
90                         read_unlock(&ofd->ofd_seq_list_lock);
91                         return oseq;
92                 }
93         }
94         read_unlock(&ofd->ofd_seq_list_lock);
95         return NULL;
96 }
97
98 static void ofd_seq_destroy(const struct lu_env *env,
99                             struct ofd_seq *oseq)
100 {
101         LASSERT(cfs_list_empty(&oseq->os_list));
102         LASSERT(oseq->os_lastid_obj != NULL);
103         lu_object_put(env, &oseq->os_lastid_obj->do_lu);
104         OBD_FREE_PTR(oseq);
105 }
106
107 void ofd_seq_put(const struct lu_env *env, struct ofd_seq *oseq)
108 {
109         if (cfs_atomic_dec_and_test(&oseq->os_refc))
110                 ofd_seq_destroy(env, oseq);
111 }
112
113 static void ofd_seq_delete(const struct lu_env *env, struct ofd_seq *oseq)
114 {
115         cfs_list_del_init(&oseq->os_list);
116         ofd_seq_put(env, oseq);
117 }
118
119 /**
120  * Add a new sequence to the OFD device.
121  *
122  * \param ofd OFD device
123  * \param new_seq new sequence to be added
124  *
125  * \retval the seq to be added or the existing seq
126  **/
127 static struct ofd_seq *ofd_seq_add(const struct lu_env *env,
128                                    struct ofd_device *ofd,
129                                    struct ofd_seq *new_seq)
130 {
131         struct ofd_seq *os = NULL;
132
133         write_lock(&ofd->ofd_seq_list_lock);
134         cfs_list_for_each_entry(os, &ofd->ofd_seq_list, os_list) {
135                 if (ostid_seq(&os->os_oi) == ostid_seq(&new_seq->os_oi)) {
136                         cfs_atomic_inc(&os->os_refc);
137                         write_unlock(&ofd->ofd_seq_list_lock);
138                         /* The seq has not been added to the list */
139                         ofd_seq_put(env, new_seq);
140                         return os;
141                 }
142         }
143         cfs_atomic_inc(&new_seq->os_refc);
144         cfs_list_add_tail(&new_seq->os_list, &ofd->ofd_seq_list);
145         ofd->ofd_seq_count++;
146         write_unlock(&ofd->ofd_seq_list_lock);
147         return new_seq;
148 }
149
150 obd_id ofd_seq_last_oid(struct ofd_seq *oseq)
151 {
152         obd_id id;
153
154         spin_lock(&oseq->os_last_oid_lock);
155         id = ostid_id(&oseq->os_oi);
156         spin_unlock(&oseq->os_last_oid_lock);
157
158         return id;
159 }
160
161 void ofd_seq_last_oid_set(struct ofd_seq *oseq, obd_id id)
162 {
163         spin_lock(&oseq->os_last_oid_lock);
164         if (likely(ostid_id(&oseq->os_oi) < id))
165                 ostid_set_id(&oseq->os_oi, id);
166         spin_unlock(&oseq->os_last_oid_lock);
167 }
168
169 int ofd_seq_last_oid_write(const struct lu_env *env, struct ofd_device *ofd,
170                            struct ofd_seq *oseq)
171 {
172         struct ofd_thread_info  *info = ofd_info(env);
173         obd_id                   tmp;
174         int                      rc;
175
176         ENTRY;
177
178         tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
179
180         info->fti_buf.lb_buf = &tmp;
181         info->fti_buf.lb_len = sizeof(tmp);
182         info->fti_off = 0;
183
184         rc = ofd_record_write(env, ofd, oseq->os_lastid_obj, &info->fti_buf,
185                               &info->fti_off);
186
187         CDEBUG(D_INODE, "%s: write last_objid "DOSTID": rc = %d\n",
188                ofd_name(ofd), POSTID(&oseq->os_oi), rc);
189
190         RETURN(rc);
191 }
192
193 static void ofd_deregister_seq_exp(struct ofd_device *ofd)
194 {
195         struct seq_server_site  *ss = &ofd->ofd_seq_site;
196
197         if (ss->ss_client_seq != NULL) {
198                 lustre_deregister_lwp_item(&ss->ss_client_seq->lcs_exp);
199                 ss->ss_client_seq->lcs_exp = NULL;
200         }
201
202         if (ss->ss_server_fld != NULL) {
203                 lustre_deregister_lwp_item(&ss->ss_server_fld->lsf_control_exp);
204                 ss->ss_server_fld->lsf_control_exp = NULL;
205         }
206 }
207
208 static int ofd_fld_fini(const struct lu_env *env,
209                         struct ofd_device *ofd)
210 {
211         struct seq_server_site *ss = &ofd->ofd_seq_site;
212         ENTRY;
213
214         if (ss && ss->ss_server_fld) {
215                 fld_server_fini(env, ss->ss_server_fld);
216                 OBD_FREE_PTR(ss->ss_server_fld);
217                 ss->ss_server_fld = NULL;
218         }
219
220         RETURN(0);
221 }
222
223 void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd)
224 {
225         struct ofd_seq  *oseq;
226         struct ofd_seq  *tmp;
227         cfs_list_t       dispose;
228         int             rc;
229
230         ofd_deregister_seq_exp(ofd);
231
232         rc = ofd_fid_fini(env, ofd);
233         if (rc != 0)
234                 CERROR("%s: fid fini error: rc = %d\n", ofd_name(ofd), rc);
235
236         rc = ofd_fld_fini(env, ofd);
237         if (rc != 0)
238                 CERROR("%s: fld fini error: rc = %d\n", ofd_name(ofd), rc);
239
240         CFS_INIT_LIST_HEAD(&dispose);
241         write_lock(&ofd->ofd_seq_list_lock);
242         cfs_list_for_each_entry_safe(oseq, tmp, &ofd->ofd_seq_list, os_list) {
243                 cfs_list_move(&oseq->os_list, &dispose);
244         }
245         write_unlock(&ofd->ofd_seq_list_lock);
246
247         while (!cfs_list_empty(&dispose)) {
248                 oseq = container_of0(dispose.next, struct ofd_seq, os_list);
249                 ofd_seq_delete(env, oseq);
250         }
251
252         LASSERT(cfs_list_empty(&ofd->ofd_seq_list));
253         return;
254 }
255
256 /**
257  *
258  * \retval the seq with seq number or errno (never NULL)
259  */
260 struct ofd_seq *ofd_seq_load(const struct lu_env *env, struct ofd_device *ofd,
261                              obd_seq seq)
262 {
263         struct ofd_thread_info  *info = ofd_info(env);
264         struct ofd_seq          *oseq = NULL;
265         struct dt_object        *dob;
266         obd_id                   lastid;
267         int                      rc;
268
269         ENTRY;
270
271         /* if seq is already initialized */
272         oseq = ofd_seq_get(ofd, seq);
273         if (oseq != NULL)
274                 RETURN(oseq);
275
276         OBD_ALLOC_PTR(oseq);
277         if (oseq == NULL)
278                 RETURN(ERR_PTR(-ENOMEM));
279
280         lu_last_id_fid(&info->fti_fid, seq);
281         memset(&info->fti_attr, 0, sizeof(info->fti_attr));
282         info->fti_attr.la_valid = LA_MODE;
283         info->fti_attr.la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
284         info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
285
286         /* create object tracking per-seq last created
287          * id to be used by orphan recovery mechanism */
288         dob = dt_find_or_create(env, ofd->ofd_osd, &info->fti_fid,
289                                 &info->fti_dof, &info->fti_attr);
290         if (IS_ERR(dob)) {
291                 OBD_FREE_PTR(oseq);
292                 RETURN((void *)dob);
293         }
294
295         oseq->os_lastid_obj = dob;
296
297         CFS_INIT_LIST_HEAD(&oseq->os_list);
298         mutex_init(&oseq->os_create_lock);
299         spin_lock_init(&oseq->os_last_oid_lock);
300         ostid_set_seq(&oseq->os_oi, seq);
301
302         cfs_atomic_set(&oseq->os_refc, 1);
303
304         rc = dt_attr_get(env, dob, &info->fti_attr, BYPASS_CAPA);
305         if (rc)
306                 GOTO(cleanup, rc);
307
308         if (info->fti_attr.la_size == 0) {
309                 /* object is just created, initialize last id */
310                 ofd_seq_last_oid_set(oseq, OFD_INIT_OBJID);
311                 ofd_seq_last_oid_write(env, ofd, oseq);
312         } else if (info->fti_attr.la_size == sizeof(lastid)) {
313                 info->fti_off = 0;
314                 info->fti_buf.lb_buf = &lastid;
315                 info->fti_buf.lb_len = sizeof(lastid);
316
317                 rc = dt_record_read(env, dob, &info->fti_buf, &info->fti_off);
318                 if (rc) {
319                         CERROR("%s: can't read last_id: rc = %d\n",
320                                 ofd_name(ofd), rc);
321                         GOTO(cleanup, rc);
322                 }
323                 ofd_seq_last_oid_set(oseq, le64_to_cpu(lastid));
324         } else {
325                 CERROR("%s: corrupted size "LPU64" LAST_ID of seq "LPX64"\n",
326                         ofd_name(ofd), (__u64)info->fti_attr.la_size, seq);
327                 GOTO(cleanup, rc = -EINVAL);
328         }
329
330         oseq = ofd_seq_add(env, ofd, oseq);
331         RETURN((oseq != NULL) ? oseq : ERR_PTR(-ENOENT));
332 cleanup:
333         ofd_seq_put(env, oseq);
334         return ERR_PTR(rc);
335 }
336
337 static int ofd_fld_init(const struct lu_env *env, const char *uuid,
338                         struct ofd_device *ofd)
339 {
340         struct seq_server_site *ss = &ofd->ofd_seq_site;
341         int rc;
342         ENTRY;
343
344         OBD_ALLOC_PTR(ss->ss_server_fld);
345         if (ss->ss_server_fld == NULL)
346                 RETURN(rc = -ENOMEM);
347
348         rc = fld_server_init(env, ss->ss_server_fld, ofd->ofd_osd, uuid,
349                              ss->ss_node_id, LU_SEQ_RANGE_OST);
350         if (rc) {
351                 OBD_FREE_PTR(ss->ss_server_fld);
352                 ss->ss_server_fld = NULL;
353                 RETURN(rc);
354         }
355         RETURN(0);
356 }
357
358 static int ofd_register_seq_exp(struct ofd_device *ofd)
359 {
360         struct seq_server_site  *ss = &ofd->ofd_seq_site;
361         char                    *lwp_name = NULL;
362         int                     rc;
363
364         OBD_ALLOC(lwp_name, MAX_OBD_NAME);
365         if (lwp_name == NULL)
366                 GOTO(out_free, rc = -ENOMEM);
367
368         rc = tgt_name2lwpname(ofd_name(ofd), lwp_name);
369         if (rc != 0)
370                 GOTO(out_free, rc);
371
372         rc = lustre_register_lwp_item(lwp_name, &ss->ss_client_seq->lcs_exp,
373                                       NULL, NULL);
374         if (rc != 0)
375                 GOTO(out_free, rc);
376
377         rc = lustre_register_lwp_item(lwp_name,
378                                       &ss->ss_server_fld->lsf_control_exp,
379                                       NULL, NULL);
380         if (rc != 0) {
381                 lustre_deregister_lwp_item(&ss->ss_client_seq->lcs_exp);
382                 ss->ss_client_seq->lcs_exp = NULL;
383                 GOTO(out_free, rc);
384         }
385 out_free:
386         if (lwp_name != NULL)
387                 OBD_FREE(lwp_name, MAX_OBD_NAME);
388
389         return rc;
390 }
391
392 /* object sequence management */
393 int ofd_seqs_init(const struct lu_env *env, struct ofd_device *ofd)
394 {
395         int rc;
396
397         rc = ofd_fid_init(env, ofd);
398         if (rc != 0) {
399                 CERROR("%s: fid init error: rc = %d\n", ofd_name(ofd), rc);
400                 return rc;
401         }
402
403         rc = ofd_fld_init(env, ofd_name(ofd), ofd);
404         if (rc) {
405                 CERROR("%s: Can't init fld, rc %d\n", ofd_name(ofd), rc);
406                 return rc;
407         }
408
409         rc = ofd_register_seq_exp(ofd);
410         if (rc) {
411                 CERROR("%s: Can't init seq exp, rc %d\n", ofd_name(ofd), rc);
412                 return rc;
413         }
414
415         rwlock_init(&ofd->ofd_seq_list_lock);
416         CFS_INIT_LIST_HEAD(&ofd->ofd_seq_list);
417         ofd->ofd_seq_count = 0;
418         return rc;
419 }
420
421 int ofd_clients_data_init(const struct lu_env *env, struct ofd_device *ofd,
422                           unsigned long fsize)
423 {
424         struct obd_device               *obd = ofd_obd(ofd);
425         struct lr_server_data           *lsd = &ofd->ofd_lut.lut_lsd;
426         struct lsd_client_data          *lcd = NULL;
427         struct filter_export_data       *fed;
428         int                              cl_idx;
429         int                              rc = 0;
430         loff_t                           off = lsd->lsd_client_start;
431
432         CLASSERT(offsetof(struct lsd_client_data, lcd_padding) +
433                  sizeof(lcd->lcd_padding) == LR_CLIENT_SIZE);
434
435         OBD_ALLOC_PTR(lcd);
436         if (lcd == NULL)
437                 RETURN(-ENOMEM);
438
439         for (cl_idx = 0; off < fsize; cl_idx++) {
440                 struct obd_export       *exp;
441                 __u64                    last_rcvd;
442
443                 /* Don't assume off is incremented properly by
444                  * fsfilt_read_record(), in case sizeof(*lcd)
445                  * isn't the same as fsd->lsd_client_size.  */
446                 off = lsd->lsd_client_start + cl_idx * lsd->lsd_client_size;
447                 rc = tgt_client_data_read(env, &ofd->ofd_lut, lcd, &off, cl_idx);
448                 if (rc) {
449                         CERROR("%s: error reading FILT %s idx %d off %llu: "
450                                "rc = %d\n", ofd_name(ofd), LAST_RCVD, cl_idx,
451                                off, rc);
452                         rc = 0;
453                         break; /* read error shouldn't cause startup to fail */
454                 }
455
456                 if (lcd->lcd_uuid[0] == '\0') {
457                         CDEBUG(D_INFO, "skipping zeroed client at offset %d\n",
458                                cl_idx);
459                         continue;
460                 }
461
462                 last_rcvd = lcd->lcd_last_transno;
463
464                 /* These exports are cleaned up by ofd_disconnect(), so they
465                  * need to be set up like real exports as ofd_connect() does.
466                  */
467                 exp = class_new_export(obd, (struct obd_uuid *)lcd->lcd_uuid);
468
469                 CDEBUG(D_HA, "RCVRNG CLIENT uuid: %s idx: %d lr: "LPU64
470                        " srv lr: "LPU64"\n", lcd->lcd_uuid, cl_idx,
471                        last_rcvd, lsd->lsd_last_transno);
472
473                 if (IS_ERR(exp)) {
474                         if (PTR_ERR(exp) == -EALREADY) {
475                                 /* export already exists, zero out this one */
476                                 CERROR("%s: Duplicate export %s!\n",
477                                        ofd_name(ofd), lcd->lcd_uuid);
478                                 continue;
479                         }
480                         GOTO(err_out, rc = PTR_ERR(exp));
481                 }
482
483                 fed = &exp->exp_filter_data;
484                 *fed->fed_ted.ted_lcd = *lcd;
485
486                 rc = tgt_client_add(env, exp, cl_idx);
487                 LASSERTF(rc == 0, "rc = %d\n", rc); /* can't fail existing */
488                 /* VBR: set export last committed version */
489                 exp->exp_last_committed = last_rcvd;
490                 spin_lock(&exp->exp_lock);
491                 exp->exp_connecting = 0;
492                 exp->exp_in_recovery = 0;
493                 spin_unlock(&exp->exp_lock);
494                 obd->obd_max_recoverable_clients++;
495                 class_export_put(exp);
496
497                 /* Need to check last_rcvd even for duplicated exports. */
498                 CDEBUG(D_OTHER, "client at idx %d has last_rcvd = "LPU64"\n",
499                        cl_idx, last_rcvd);
500
501                 spin_lock(&ofd->ofd_lut.lut_translock);
502                 if (last_rcvd > lsd->lsd_last_transno)
503                         lsd->lsd_last_transno = last_rcvd;
504                 spin_unlock(&ofd->ofd_lut.lut_translock);
505         }
506
507 err_out:
508         OBD_FREE_PTR(lcd);
509         RETURN(rc);
510 }
511
512 int ofd_server_data_init(const struct lu_env *env, struct ofd_device *ofd)
513 {
514         struct ofd_thread_info  *info = ofd_info(env);
515         struct lr_server_data   *lsd = &ofd->ofd_lut.lut_lsd;
516         struct obd_device       *obd = ofd_obd(ofd);
517         unsigned long           last_rcvd_size;
518         __u32                   index;
519         int                     rc;
520
521         rc = dt_attr_get(env, ofd->ofd_lut.lut_last_rcvd, &info->fti_attr,
522                          BYPASS_CAPA);
523         if (rc)
524                 RETURN(rc);
525
526         last_rcvd_size = (unsigned long)info->fti_attr.la_size;
527
528         /* ensure padding in the struct is the correct size */
529         CLASSERT (offsetof(struct lr_server_data, lsd_padding) +
530                   sizeof(lsd->lsd_padding) == LR_SERVER_SIZE);
531
532         rc = server_name2index(obd->obd_name, &index, NULL);
533         if (rc < 0) {
534                 CERROR("%s: Can not get index from obd_name: rc = %d\n",
535                        obd->obd_name, rc);
536                 RETURN(rc);
537         }
538
539         if (last_rcvd_size == 0) {
540                 LCONSOLE_WARN("%s: new disk, initializing\n", obd->obd_name);
541
542                 memcpy(lsd->lsd_uuid, obd->obd_uuid.uuid,
543                        sizeof(lsd->lsd_uuid));
544                 lsd->lsd_last_transno = 0;
545                 lsd->lsd_mount_count = 0;
546                 lsd->lsd_server_size = LR_SERVER_SIZE;
547                 lsd->lsd_client_start = LR_CLIENT_START;
548                 lsd->lsd_client_size = LR_CLIENT_SIZE;
549                 lsd->lsd_subdir_count = FILTER_SUBDIR_COUNT;
550                 lsd->lsd_feature_incompat = OBD_INCOMPAT_OST;
551                 lsd->lsd_osd_index = index;
552         } else {
553                 rc = tgt_server_data_read(env, &ofd->ofd_lut);
554                 if (rc) {
555                         CDEBUG(D_INODE,"OBD ofd: error reading %s: rc %d\n",
556                                LAST_RCVD, rc);
557                         GOTO(err_fsd, rc);
558                 }
559                 if (strcmp((char *)lsd->lsd_uuid,
560                            (char *)obd->obd_uuid.uuid)) {
561                         LCONSOLE_ERROR("Trying to start OBD %s using the wrong"
562                                        " disk %s. Were the /dev/ assignments "
563                                        "rearranged?\n",
564                                        obd->obd_uuid.uuid, lsd->lsd_uuid);
565                         GOTO(err_fsd, rc = -EINVAL);
566                 }
567
568                 if (lsd->lsd_osd_index == 0) {
569                         lsd->lsd_osd_index = index;
570                 } else if (lsd->lsd_osd_index != index) {
571                         LCONSOLE_ERROR("%s: index %d in last rcvd is different"
572                                        " with the index %d in config log."
573                                        " It might be disk corruption!\n",
574                                        obd->obd_name, lsd->lsd_osd_index,
575                                        index);
576                         GOTO(err_fsd, rc = -EINVAL);
577                 }
578         }
579
580         lsd->lsd_mount_count++;
581         obd->u.obt.obt_mount_count = lsd->lsd_mount_count;
582         obd->u.obt.obt_instance = (__u32)obd->u.obt.obt_mount_count;
583         ofd->ofd_subdir_count = lsd->lsd_subdir_count;
584
585         if (lsd->lsd_feature_incompat & ~OFD_INCOMPAT_SUPP) {
586                 CERROR("%s: unsupported incompat filesystem feature(s) %x\n",
587                        obd->obd_name,
588                        lsd->lsd_feature_incompat & ~OFD_INCOMPAT_SUPP);
589                 GOTO(err_fsd, rc = -EINVAL);
590         }
591         if (lsd->lsd_feature_rocompat & ~OFD_ROCOMPAT_SUPP) {
592                 CERROR("%s: unsupported read-only filesystem feature(s) %x\n",
593                        obd->obd_name,
594                        lsd->lsd_feature_rocompat & ~OFD_ROCOMPAT_SUPP);
595                 /* Do something like remount filesystem read-only */
596                 GOTO(err_fsd, rc = -EINVAL);
597         }
598
599         CDEBUG(D_INODE, "%s: server last_transno : "LPU64"\n",
600                obd->obd_name, lsd->lsd_last_transno);
601         CDEBUG(D_INODE, "%s: server mount_count: "LPU64"\n",
602                obd->obd_name, lsd->lsd_mount_count);
603         CDEBUG(D_INODE, "%s: server data size: %u\n",
604                obd->obd_name, lsd->lsd_server_size);
605         CDEBUG(D_INODE, "%s: per-client data start: %u\n",
606                obd->obd_name, lsd->lsd_client_start);
607         CDEBUG(D_INODE, "%s: per-client data size: %u\n",
608                obd->obd_name, lsd->lsd_client_size);
609         CDEBUG(D_INODE, "%s: server subdir_count: %u\n",
610                obd->obd_name, lsd->lsd_subdir_count);
611         CDEBUG(D_INODE, "%s: last_rcvd clients: %lu\n", obd->obd_name,
612                last_rcvd_size <= lsd->lsd_client_start ? 0 :
613                (last_rcvd_size - lsd->lsd_client_start) /
614                lsd->lsd_client_size);
615
616         if (!obd->obd_replayable)
617                 CWARN("%s: recovery support OFF\n", obd->obd_name);
618
619         rc = ofd_clients_data_init(env, ofd, last_rcvd_size);
620
621         spin_lock(&ofd->ofd_lut.lut_translock);
622         obd->obd_last_committed = lsd->lsd_last_transno;
623         ofd->ofd_lut.lut_last_transno = lsd->lsd_last_transno;
624         spin_unlock(&ofd->ofd_lut.lut_translock);
625
626         /* save it, so mount count and last_transno is current */
627         rc = tgt_server_data_update(env, &ofd->ofd_lut, 0);
628         if (rc)
629                 GOTO(err_fsd, rc);
630
631         RETURN(0);
632
633 err_fsd:
634         class_disconnect_exports(obd);
635         RETURN(rc);
636 }
637
638 int ofd_fs_setup(const struct lu_env *env, struct ofd_device *ofd,
639                  struct obd_device *obd)
640 {
641         struct ofd_thread_info  *info = ofd_info(env);
642         struct dt_object        *fo;
643         int                      rc = 0;
644
645         ENTRY;
646
647         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FS_SETUP))
648                 RETURN (-ENOENT);
649
650         /* prepare transactions callbacks */
651         ofd->ofd_txn_cb.dtc_txn_start = NULL;
652         ofd->ofd_txn_cb.dtc_txn_stop = ofd_txn_stop_cb;
653         ofd->ofd_txn_cb.dtc_txn_commit = NULL;
654         ofd->ofd_txn_cb.dtc_cookie = ofd;
655         ofd->ofd_txn_cb.dtc_tag = LCT_DT_THREAD;
656         CFS_INIT_LIST_HEAD(&ofd->ofd_txn_cb.dtc_linkage);
657
658         dt_txn_callback_add(ofd->ofd_osd, &ofd->ofd_txn_cb);
659
660         rc = ofd_server_data_init(env, ofd);
661         if (rc)
662                 GOTO(out, rc);
663
664         lu_local_obj_fid(&info->fti_fid, OFD_HEALTH_CHECK_OID);
665         memset(&info->fti_attr, 0, sizeof(info->fti_attr));
666         info->fti_attr.la_valid = LA_MODE;
667         info->fti_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
668         info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
669
670         fo = dt_find_or_create(env, ofd->ofd_osd, &info->fti_fid,
671                                &info->fti_dof, &info->fti_attr);
672         if (IS_ERR(fo))
673                 GOTO(out, rc = PTR_ERR(fo));
674
675         ofd->ofd_health_check_file = fo;
676
677         rc = ofd_seqs_init(env, ofd);
678         if (rc)
679                 GOTO(out_hc, rc);
680
681         RETURN(0);
682 out_hc:
683         lu_object_put(env, &ofd->ofd_health_check_file->do_lu);
684 out:
685         dt_txn_callback_del(ofd->ofd_osd, &ofd->ofd_txn_cb);
686         return rc;
687 }
688
689 void ofd_fs_cleanup(const struct lu_env *env, struct ofd_device *ofd)
690 {
691         int i;
692
693         ENTRY;
694
695         ofd_info_init(env, NULL);
696
697         ofd_seqs_fini(env, ofd);
698
699         i = dt_sync(env, ofd->ofd_osd);
700         if (i)
701                 CERROR("can't sync: %d\n", i);
702
703         /* Remove transaction callback */
704         dt_txn_callback_del(ofd->ofd_osd, &ofd->ofd_txn_cb);
705
706         if (ofd->ofd_health_check_file) {
707                 lu_object_put(env, &ofd->ofd_health_check_file->do_lu);
708                 ofd->ofd_health_check_file = NULL;
709         }
710
711         EXIT;
712 }
713