Whamcloud - gitweb
LU-13636 obdclass: drop nlink if directory is removed
[fs/lustre-release.git] / lustre / ofd / ofd_fs.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2017, Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ofd/ofd_fs.c
33  *
34  * This file provides helper functions to handle various data stored on disk.
35  * It uses OSD API and works with any OSD.
36  *
37  * Note: this file contains also functions for sequence handling, they are
38  * placed here improperly and will be moved to the ofd_dev.c and ofd_internal.h,
39  * this comment is to be removed after that.
40  *
41  * Author: Alexey Zhuravlev <alexey.zhuravlev@intel.com>
42  * Author: Mikhail Pershin <mike.pershin@intel.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_FILTER
46
47 #include "ofd_internal.h"
48
49 /**
50  * Restrict precreate batch count by its upper limit.
51  *
52  * The precreate batch count is a number of precreates to do in
53  * single transaction. It has upper limit - ofd_device::ofd_precreate_batch
54  * value which shouldn't be exceeded.
55  *
56  * \param[in] ofd       OFD device
57  * \param[in] batch     number of updates in the batch
58  *
59  * \retval              \a batch limited by ofd_device::ofd_precreate_batch
60  */
61 int ofd_precreate_batch(struct ofd_device *ofd, int batch)
62 {
63         int count;
64
65         spin_lock(&ofd->ofd_batch_lock);
66         count = min(ofd->ofd_precreate_batch, batch);
67         spin_unlock(&ofd->ofd_batch_lock);
68
69         return count;
70 }
71
72 /**
73  * Get ofd_seq for \a seq.
74  *
75  * Function finds appropriate structure by \a seq number and
76  * increases the reference counter of that structure.
77  *
78  * \param[in] ofd       OFD device
79  * \param[in] seq       sequence number, FID sequence number usually
80  *
81  * \retval              pointer to the requested ofd_seq structure
82  * \retval              NULL if ofd_seq is not found
83  */
84 struct ofd_seq *ofd_seq_get(struct ofd_device *ofd, u64 seq)
85 {
86         struct ofd_seq *oseq;
87
88         read_lock(&ofd->ofd_seq_list_lock);
89         list_for_each_entry(oseq, &ofd->ofd_seq_list, os_list) {
90                 if (ostid_seq(&oseq->os_oi) == seq) {
91                         atomic_inc(&oseq->os_refc);
92                         read_unlock(&ofd->ofd_seq_list_lock);
93                         return oseq;
94                 }
95         }
96         read_unlock(&ofd->ofd_seq_list_lock);
97         return NULL;
98 }
99
100 /**
101  * Drop a reference to ofd_seq.
102  *
103  * The paired function to the ofd_seq_get(). It decrease the reference counter
104  * of the ofd_seq structure and free it if that reference was last one.
105  *
106  * \param[in] env       execution environment
107  * \param[in] oseq      ofd_seq structure to put
108  */
109 void ofd_seq_put(const struct lu_env *env, struct ofd_seq *oseq)
110 {
111         if (atomic_dec_and_test(&oseq->os_refc)) {
112                 LASSERT(list_empty(&oseq->os_list));
113                 LASSERT(oseq->os_lastid_obj != NULL);
114                 dt_object_put(env, oseq->os_lastid_obj);
115                 OBD_FREE_PTR(oseq);
116         }
117 }
118
119 /**
120  * Add a new ofd_seq to the given OFD device.
121  *
122  * First it checks if there is already existent ofd_seq with the same
123  * sequence number as used by \a new_seq.
124  * If such ofd_seq is not found then the \a new_seq is added to the list
125  * of all ofd_seq structures else the \a new_seq is dropped and the found
126  * ofd_seq is returned back.
127  *
128  * \param[in] env       execution environment
129  * \param[in] ofd       OFD device
130  * \param[in] new_seq   new ofd_seq to be added
131  *
132  * \retval              ofd_seq structure
133  */
134 static struct ofd_seq *ofd_seq_add(const struct lu_env *env,
135                                    struct ofd_device *ofd,
136                                    struct ofd_seq *new_seq)
137 {
138         struct ofd_seq *os = NULL;
139
140         write_lock(&ofd->ofd_seq_list_lock);
141         list_for_each_entry(os, &ofd->ofd_seq_list, os_list) {
142                 if (ostid_seq(&os->os_oi) == ostid_seq(&new_seq->os_oi)) {
143                         atomic_inc(&os->os_refc);
144                         write_unlock(&ofd->ofd_seq_list_lock);
145                         /* The seq has not been added to the list */
146                         ofd_seq_put(env, new_seq);
147                         return os;
148                 }
149         }
150         atomic_inc(&new_seq->os_refc);
151         list_add_tail(&new_seq->os_list, &ofd->ofd_seq_list);
152         ofd->ofd_seq_count++;
153         write_unlock(&ofd->ofd_seq_list_lock);
154         return new_seq;
155 }
156
157 /**
158  * Get last object ID for the given sequence.
159  *
160  * \param[in] oseq      OFD sequence structure
161  *
162  * \retval              the last object ID for this sequence
163  */
164 u64 ofd_seq_last_oid(struct ofd_seq *oseq)
165 {
166         u64 id;
167
168         spin_lock(&oseq->os_last_oid_lock);
169         id = ostid_id(&oseq->os_oi);
170         spin_unlock(&oseq->os_last_oid_lock);
171
172         return id;
173 }
174
175 /**
176  * Set new last object ID for the given sequence.
177  *
178  * \param[in] oseq      OFD sequence
179  * \param[in] id        the new OID to set
180  */
181 void ofd_seq_last_oid_set(struct ofd_seq *oseq, u64 id)
182 {
183         spin_lock(&oseq->os_last_oid_lock);
184         if (likely(ostid_id(&oseq->os_oi) < id)) {
185                 if (ostid_set_id(&oseq->os_oi, id)) {
186                         CERROR("Bad %llu to set " DOSTID "\n",
187                                (unsigned long long)id, POSTID(&oseq->os_oi));
188                 }
189         }
190         spin_unlock(&oseq->os_last_oid_lock);
191 }
192
193 /**
194  * Update last used OID on disk for the given sequence.
195  *
196  * The last used object ID is stored persistently on disk and
197  * must be written when updated. This function writes the sequence data.
198  * The format is just an object ID of the latest used object FID.
199  * Each ID is stored in per-sequence file.
200  *
201  * \param[in] env       execution environment
202  * \param[in] ofd       OFD device
203  * \param[in] oseq      ofd_seq structure with data to write
204  *
205  * \retval              0 on successful write of data from \a oseq
206  * \retval              negative value on error
207  */
208 int ofd_seq_last_oid_write(const struct lu_env *env, struct ofd_device *ofd,
209                            struct ofd_seq *oseq)
210 {
211         struct ofd_thread_info  *info = ofd_info(env);
212         u64                      tmp;
213         struct dt_object        *obj = oseq->os_lastid_obj;
214         struct thandle          *th;
215         int                      rc;
216
217         ENTRY;
218
219         if (ofd->ofd_osd->dd_rdonly)
220                 RETURN(0);
221
222         tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
223
224         info->fti_buf.lb_buf = &tmp;
225         info->fti_buf.lb_len = sizeof(tmp);
226         info->fti_off = 0;
227
228         LASSERT(obj != NULL);
229
230         th = dt_trans_create(env, ofd->ofd_osd);
231         if (IS_ERR(th))
232                 RETURN(PTR_ERR(th));
233
234         rc = dt_declare_record_write(env, obj, &info->fti_buf,
235                                      info->fti_off, th);
236         if (rc < 0)
237                 GOTO(out, rc);
238         rc = dt_trans_start_local(env, ofd->ofd_osd, th);
239         if (rc < 0)
240                 GOTO(out, rc);
241         rc = dt_record_write(env, obj, &info->fti_buf, &info->fti_off,
242                              th);
243         if (rc < 0)
244                 GOTO(out, rc);
245
246         CDEBUG(D_INODE, "%s: write last_objid "DOSTID": rc = %d\n",
247                ofd_name(ofd), POSTID(&oseq->os_oi), rc);
248         EXIT;
249 out:
250         dt_trans_stop(env, ofd->ofd_osd, th);
251         return rc;
252 }
253
254 /**
255  * Deregister LWP items for FLDB and SEQ client on OFD.
256  *
257  * LWP is lightweight proxy - simplified connection between
258  * servers. It is used for FID Location Database (FLDB) and
259  * sequence (SEQ) client-server interactions.
260  *
261  * This function is used during server cleanup process to free
262  * LWP items that were previously set up upon OFD start.
263  *
264  * \param[in]     ofd   OFD device
265  */
266 static void ofd_deregister_seq_exp(struct ofd_device *ofd)
267 {
268         struct seq_server_site  *ss = &ofd->ofd_seq_site;
269
270         if (ss->ss_client_seq != NULL) {
271                 lustre_deregister_lwp_item(&ss->ss_client_seq->lcs_exp);
272                 ss->ss_client_seq->lcs_exp = NULL;
273         }
274
275         if (ss->ss_server_fld != NULL) {
276                 lustre_deregister_lwp_item(&ss->ss_server_fld->lsf_control_exp);
277                 ss->ss_server_fld->lsf_control_exp = NULL;
278         }
279 }
280
281 /**
282  * Stop FLDB server on OFD.
283  *
284  * This function is part of OFD cleanup process.
285  *
286  * \param[in] env       execution environment
287  * \param[in] ofd       OFD device
288  *
289  */
290 static void ofd_fld_fini(const struct lu_env *env, struct ofd_device *ofd)
291 {
292         struct seq_server_site *ss = &ofd->ofd_seq_site;
293
294         if (ss != NULL && ss->ss_server_fld != NULL) {
295                 fld_server_fini(env, ss->ss_server_fld);
296                 OBD_FREE_PTR(ss->ss_server_fld);
297                 ss->ss_server_fld = NULL;
298         }
299 }
300
301 /**
302  * Free sequence structures on OFD.
303  *
304  * This function is part of OFD cleanup process, it goes through
305  * the list of ofd_seq structures stored in ofd_device structure
306  * and frees them.
307  *
308  * \param[in] env       execution environment
309  * \param[in] ofd       OFD device
310  */
311 void ofd_seqs_free(const struct lu_env *env, struct ofd_device *ofd)
312 {
313         struct ofd_seq          *oseq;
314         struct ofd_seq          *tmp;
315         struct list_head         dispose;
316
317         INIT_LIST_HEAD(&dispose);
318         write_lock(&ofd->ofd_seq_list_lock);
319         list_for_each_entry_safe(oseq, tmp, &ofd->ofd_seq_list, os_list)
320                 list_move(&oseq->os_list, &dispose);
321         write_unlock(&ofd->ofd_seq_list_lock);
322
323         while (!list_empty(&dispose)) {
324                 oseq = container_of0(dispose.next, struct ofd_seq, os_list);
325                 list_del_init(&oseq->os_list);
326                 ofd_seq_put(env, oseq);
327         }
328 }
329
330 /**
331  * Stop FLDB and SEQ services on OFD.
332  *
333  * This function is part of OFD cleanup process.
334  *
335  * \param[in] env       execution environment
336  * \param[in] ofd       OFD device
337  *
338  */
339 void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd)
340 {
341         int rc;
342
343         ofd_deregister_seq_exp(ofd);
344
345         rc = ofd_fid_fini(env, ofd);
346         if (rc != 0)
347                 CERROR("%s: fid fini error: rc = %d\n", ofd_name(ofd), rc);
348
349         ofd_fld_fini(env, ofd);
350
351         ofd_seqs_free(env, ofd);
352
353         LASSERT(list_empty(&ofd->ofd_seq_list));
354 }
355
356 /**
357  * Return ofd_seq structure filled with valid data.
358  *
359  * This function gets the ofd_seq by sequence number and read
360  * corresponding data from disk.
361  *
362  * \param[in] env       execution environment
363  * \param[in] ofd       OFD device
364  * \param[in] seq       sequence number
365  *
366  * \retval              ofd_seq structure filled with data
367  * \retval              ERR_PTR pointer on error
368  */
369 struct ofd_seq *ofd_seq_load(const struct lu_env *env, struct ofd_device *ofd,
370                              u64 seq)
371 {
372         struct ofd_thread_info  *info = ofd_info(env);
373         struct ofd_seq          *oseq = NULL;
374         struct dt_object        *dob;
375         u64                      lastid;
376         int                      rc;
377
378         ENTRY;
379
380         /* if seq is already initialized */
381         oseq = ofd_seq_get(ofd, seq);
382         if (oseq != NULL) {
383                 CDEBUG(D_TRACE, "%s: got sequence %#llx "DOSTID"\n",
384                        ofd_name(ofd), seq, POSTID(&oseq->os_oi));
385                 RETURN(oseq);
386         }
387
388         OBD_ALLOC_PTR(oseq);
389         if (oseq == NULL)
390                 RETURN(ERR_PTR(-ENOMEM));
391
392         lu_last_id_fid(&info->fti_fid, seq, ofd->ofd_lut.lut_lsd.lsd_osd_index);
393         memset(&info->fti_attr, 0, sizeof(info->fti_attr));
394         info->fti_attr.la_valid = LA_MODE;
395         info->fti_attr.la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
396         info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
397
398         /* create object tracking per-seq last created
399          * id to be used by orphan recovery mechanism */
400         dob = dt_find_or_create(env, ofd->ofd_osd, &info->fti_fid,
401                                 &info->fti_dof, &info->fti_attr);
402         if (IS_ERR(dob)) {
403                 OBD_FREE_PTR(oseq);
404                 RETURN((void *)dob);
405         }
406
407         oseq->os_lastid_obj = dob;
408
409         INIT_LIST_HEAD(&oseq->os_list);
410         mutex_init(&oseq->os_create_lock);
411         spin_lock_init(&oseq->os_last_oid_lock);
412         ostid_set_seq(&oseq->os_oi, seq);
413
414         atomic_set(&oseq->os_refc, 1);
415         atomic_set(&oseq->os_precreate_in_progress, 0);
416
417         rc = dt_attr_get(env, dob, &info->fti_attr);
418         if (rc)
419                 GOTO(cleanup, rc);
420
421         if (info->fti_attr.la_size == 0) {
422                 /* object is just created, initialize last id */
423                 if (OBD_FAIL_CHECK(OBD_FAIL_OFD_SET_OID))
424                         ofd_seq_last_oid_set(oseq, 0xffffff00);
425                 else
426                         ofd_seq_last_oid_set(oseq, OFD_INIT_OBJID);
427                 ofd_seq_last_oid_write(env, ofd, oseq);
428         } else if (info->fti_attr.la_size == sizeof(lastid)) {
429                 info->fti_off = 0;
430                 info->fti_buf.lb_buf = &lastid;
431                 info->fti_buf.lb_len = sizeof(lastid);
432
433                 rc = dt_record_read(env, dob, &info->fti_buf, &info->fti_off);
434                 if (rc) {
435                         CERROR("%s: can't read last_id: rc = %d\n",
436                                 ofd_name(ofd), rc);
437                         GOTO(cleanup, rc);
438                 }
439                 ofd_seq_last_oid_set(oseq, le64_to_cpu(lastid));
440         } else {
441                 CERROR("%s: corrupted size %llu LAST_ID of seq %#llx\n",
442                         ofd_name(ofd), (__u64)info->fti_attr.la_size, seq);
443                 GOTO(cleanup, rc = -EINVAL);
444         }
445
446         CDEBUG(D_HA, "%s: adding sequence %#llx\n", ofd_name(ofd), seq);
447
448         oseq = ofd_seq_add(env, ofd, oseq);
449         RETURN((oseq != NULL) ? oseq : ERR_PTR(-ENOENT));
450 cleanup:
451         ofd_seq_put(env, oseq);
452         return ERR_PTR(rc);
453 }
454
455 /**
456  * initialize local FLDB server.
457  *
458  * \param[in] env       execution environment
459  * \param[in] uuid      unique name for this FLDS server
460  * \param[in] ofd       OFD device
461  *
462  * \retval              0 on successful initialization
463  * \retval              negative value on error
464  */
465 static int ofd_fld_init(const struct lu_env *env, const char *uuid,
466                         struct ofd_device *ofd)
467 {
468         struct seq_server_site *ss = &ofd->ofd_seq_site;
469         int rc;
470
471         ENTRY;
472
473         OBD_ALLOC_PTR(ss->ss_server_fld);
474         if (ss->ss_server_fld == NULL)
475                 RETURN(rc = -ENOMEM);
476
477         rc = fld_server_init(env, ss->ss_server_fld, ofd->ofd_osd, uuid,
478                              LU_SEQ_RANGE_OST);
479         if (rc < 0) {
480                 OBD_FREE_PTR(ss->ss_server_fld);
481                 ss->ss_server_fld = NULL;
482                 RETURN(rc);
483         }
484         RETURN(0);
485 }
486
487 /**
488  * Update local FLDB copy from master server.
489  *
490  * This callback is called when LWP is connected to the server.
491  * It retrieves its FLDB entries from MDT0, and it only happens
492  * when upgrading the existing file system to 2.6.
493  *
494  * \param[in] data      OFD device
495  *
496  * \retval              0 on successful FLDB update
497  * \retval              negative value in case if failure
498  */
499 static int ofd_register_lwp_callback(void *data)
500 {
501         struct lu_env           *env;
502         struct ofd_device       *ofd = data;
503         struct lu_server_fld    *fld = ofd->ofd_seq_site.ss_server_fld;
504         int                     rc;
505
506         ENTRY;
507
508         if (!likely(fld->lsf_new))
509                 RETURN(0);
510
511         OBD_ALLOC_PTR(env);
512         if (env == NULL)
513                 RETURN(-ENOMEM);
514
515         rc = lu_env_init(env, LCT_DT_THREAD);
516         if (rc < 0)
517                 GOTO(out, rc);
518
519         rc = fld_update_from_controller(env, fld);
520         if (rc < 0) {
521                 CERROR("%s: cannot update controller: rc = %d\n",
522                        ofd_name(ofd), rc);
523                 GOTO(out, rc);
524         }
525         EXIT;
526 out:
527         lu_env_fini(env);
528         OBD_FREE_PTR(env);
529         return rc;
530 }
531
532 /**
533  * Get LWP exports from LWP connection for local FLDB server and SEQ client.
534  *
535  * This function is part of setup process and initialize FLDB server and SEQ
536  * client, so they may work with remote servers.
537  *
538  * \param[in] ofd       OFD device
539  *
540  * \retval              0 on successful export get
541  * \retval              negative value on error
542  */
543 static int ofd_register_seq_exp(struct ofd_device *ofd)
544 {
545         struct seq_server_site  *ss = &ofd->ofd_seq_site;
546         char                    *lwp_name = NULL;
547         int                     rc;
548
549         OBD_ALLOC(lwp_name, MAX_OBD_NAME);
550         if (lwp_name == NULL)
551                 GOTO(out_free, rc = -ENOMEM);
552
553         rc = tgt_name2lwp_name(ofd_name(ofd), lwp_name, MAX_OBD_NAME, 0);
554         if (rc != 0)
555                 GOTO(out_free, rc);
556
557         rc = lustre_register_lwp_item(lwp_name, &ss->ss_client_seq->lcs_exp,
558                                       NULL, NULL);
559         if (rc != 0)
560                 GOTO(out_free, rc);
561
562         rc = lustre_register_lwp_item(lwp_name,
563                                       &ss->ss_server_fld->lsf_control_exp,
564                                       ofd_register_lwp_callback, ofd);
565         if (rc != 0) {
566                 lustre_deregister_lwp_item(&ss->ss_client_seq->lcs_exp);
567                 ss->ss_client_seq->lcs_exp = NULL;
568                 GOTO(out_free, rc);
569         }
570 out_free:
571         if (lwp_name != NULL)
572                 OBD_FREE(lwp_name, MAX_OBD_NAME);
573
574         return rc;
575 }
576
577 /**
578  * Initialize SEQ and FLD service on OFD.
579  *
580  * This is part of OFD setup process.
581  *
582  * \param[in] env       execution environment
583  * \param[in] ofd       OFD device
584  *
585  * \retval              0 on successful services initialization
586  * \retval              negative value on error
587  */
588 int ofd_seqs_init(const struct lu_env *env, struct ofd_device *ofd)
589 {
590         int rc;
591
592         rwlock_init(&ofd->ofd_seq_list_lock);
593         INIT_LIST_HEAD(&ofd->ofd_seq_list);
594         ofd->ofd_seq_count = 0;
595
596         rc = ofd_fid_init(env, ofd);
597         if (rc != 0) {
598                 CERROR("%s: fid init error: rc = %d\n", ofd_name(ofd), rc);
599                 GOTO(out, rc);
600         }
601
602         rc = ofd_fld_init(env, ofd_name(ofd), ofd);
603         if (rc < 0) {
604                 CERROR("%s: Can't init fld, rc %d\n", ofd_name(ofd), rc);
605                 GOTO(out_fid, rc);
606         }
607
608         rc = ofd_register_seq_exp(ofd);
609         if (rc < 0) {
610                 CERROR("%s: Can't init seq exp, rc %d\n", ofd_name(ofd), rc);
611                 GOTO(out_fld, rc);
612         }
613
614         RETURN(0);
615
616 out_fld:
617         ofd_fld_fini(env, ofd);
618 out_fid:
619         ofd_fid_fini(env, ofd);
620 out:
621         return rc;
622 }
623
624 /**
625  * Initialize storage for the OFD.
626  *
627  * This function sets up service files for OFD. Currently, the only
628  * service file is "health_check".
629  *
630  * \param[in] env       execution environment
631  * \param[in] ofd       OFD device
632  * \param[in] obd       OBD device (unused now)
633  *
634  * \retval              0 on successful setup
635  * \retval              negative value on error
636  */
637 int ofd_fs_setup(const struct lu_env *env, struct ofd_device *ofd,
638                  struct obd_device *obd)
639 {
640         struct ofd_thread_info  *info = ofd_info(env);
641         struct dt_object        *fo;
642         int                      rc = 0;
643
644         ENTRY;
645
646         rc = ofd_seqs_init(env, ofd);
647         if (rc)
648                 GOTO(out, rc);
649
650         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FS_SETUP))
651                 GOTO(out_seqs, rc = -ENOENT);
652
653         lu_local_obj_fid(&info->fti_fid, OFD_HEALTH_CHECK_OID);
654         memset(&info->fti_attr, 0, sizeof(info->fti_attr));
655         info->fti_attr.la_valid = LA_MODE;
656         info->fti_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
657         info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
658
659         fo = dt_find_or_create(env, ofd->ofd_osd, &info->fti_fid,
660                                &info->fti_dof, &info->fti_attr);
661         if (IS_ERR(fo))
662                 GOTO(out_seqs, rc = PTR_ERR(fo));
663
664         ofd->ofd_health_check_file = fo;
665
666         RETURN(0);
667
668 out_seqs:
669         ofd_seqs_fini(env, ofd);
670 out:
671         return rc;
672 }
673
674 /**
675  * Cleanup service files on OFD.
676  *
677  * This function syncs whole OFD device and close "health check" file.
678  *
679  * \param[in] env       execution environment
680  * \param[in] ofd       OFD device
681  */
682 void ofd_fs_cleanup(const struct lu_env *env, struct ofd_device *ofd)
683 {
684         int rc;
685
686         ENTRY;
687
688         ofd_seqs_fini(env, ofd);
689
690         rc = dt_sync(env, ofd->ofd_osd);
691         if (rc < 0)
692                 CWARN("%s: can't sync OFD upon cleanup: %d\n",
693                       ofd_name(ofd), rc);
694
695         if (ofd->ofd_health_check_file) {
696                 dt_object_put(env, ofd->ofd_health_check_file);
697                 ofd->ofd_health_check_file = NULL;
698         }
699
700         EXIT;
701 }
702