Whamcloud - gitweb
LU-4975 ofd: documenting the ofd_trans.c
[fs/lustre-release.git] / lustre / ofd / ofd_fs.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24  * Use is subject to license terms.
25  *
26  * Copyright (c) 2012, 2014 Intel Corporation.
27  */
28 /*
29  * This file is part of Lustre, http://www.lustre.org/
30  * Lustre is a trademark of Sun Microsystems, Inc.
31  *
32  * lustre/ofd/ofd_fs.c
33  *
34  * This file provides helper functions to handle various data stored on disk.
35  * It uses OSD API and works with any OSD.
36  *
37  * Note: this file contains also functions for sequence handling, they are
38  * placed here improperly and will be moved to the ofd_dev.c and ofd_internal.h,
39  * this comment is to be removed after that.
40  *
41  * Author: Alexey Zhuravlev <alexey.zhuravlev@intel.com>
42  * Author: Mikhail Pershin <mike.pershin@intel.com>
43  */
44
45 #define DEBUG_SUBSYSTEM S_FILTER
46
47 #include "ofd_internal.h"
48
49 /**
50  * Restrict precreate batch count by its upper limit.
51  *
52  * The precreate batch count is a number of precreates to do in
53  * single transaction. It has upper limit - ofd_device::ofd_precreate_batch
54  * value which shouldn't be exceeded.
55  *
56  * \param[in] ofd       OFD device
57  * \param[in] int       number of updates in the batch
58  *
59  * \retval              \a batch limited by ofd_device::ofd_precreate_batch
60  */
61 int ofd_precreate_batch(struct ofd_device *ofd, int batch)
62 {
63         int count;
64
65         spin_lock(&ofd->ofd_batch_lock);
66         count = min(ofd->ofd_precreate_batch, batch);
67         spin_unlock(&ofd->ofd_batch_lock);
68
69         return count;
70 }
71
72 /**
73  * Get ofd_seq for \a seq.
74  *
75  * Function finds appropriate structure by \a seq number and
76  * increases the reference counter of that structure.
77  *
78  * \param[in] ofd       OFD device
79  * \param[in] seq       sequence number, FID sequence number usually
80  *
81  * \retval              pointer to the requested ofd_seq structure
82  * \retval              NULL if ofd_seq is not found
83  */
84 struct ofd_seq *ofd_seq_get(struct ofd_device *ofd, obd_seq seq)
85 {
86         struct ofd_seq *oseq;
87
88         read_lock(&ofd->ofd_seq_list_lock);
89         list_for_each_entry(oseq, &ofd->ofd_seq_list, os_list) {
90                 if (ostid_seq(&oseq->os_oi) == seq) {
91                         atomic_inc(&oseq->os_refc);
92                         read_unlock(&ofd->ofd_seq_list_lock);
93                         return oseq;
94                 }
95         }
96         read_unlock(&ofd->ofd_seq_list_lock);
97         return NULL;
98 }
99
100 /**
101  * Drop a reference to ofd_seq.
102  *
103  * The paired function to the ofd_seq_get(). It decrease the reference counter
104  * of the ofd_seq structure and free it if that reference was last one.
105  *
106  * \param[in] env       execution environment
107  * \param[in] oseq      ofd_seq structure to put
108  */
109 void ofd_seq_put(const struct lu_env *env, struct ofd_seq *oseq)
110 {
111         if (atomic_dec_and_test(&oseq->os_refc)) {
112                 LASSERT(list_empty(&oseq->os_list));
113                 LASSERT(oseq->os_lastid_obj != NULL);
114                 lu_object_put(env, &oseq->os_lastid_obj->do_lu);
115                 OBD_FREE_PTR(oseq);
116         }
117 }
118
119 /**
120  * Add a new ofd_seq to the given OFD device.
121  *
122  * First it checks if there is already existent ofd_seq with the same
123  * sequence number as used by \a new_seq.
124  * If such ofd_seq is not found then the \a new_seq is added to the list
125  * of all ofd_seq structures else the \a new_seq is dropped and the found
126  * ofd_seq is returned back.
127  *
128  * \param[in] env       execution environment
129  * \param[in] ofd       OFD device
130  * \param[in] new_seq   new ofd_seq to be added
131  *
132  * \retval              ofd_seq structure
133  */
134 static struct ofd_seq *ofd_seq_add(const struct lu_env *env,
135                                    struct ofd_device *ofd,
136                                    struct ofd_seq *new_seq)
137 {
138         struct ofd_seq *os = NULL;
139
140         write_lock(&ofd->ofd_seq_list_lock);
141         list_for_each_entry(os, &ofd->ofd_seq_list, os_list) {
142                 if (ostid_seq(&os->os_oi) == ostid_seq(&new_seq->os_oi)) {
143                         atomic_inc(&os->os_refc);
144                         write_unlock(&ofd->ofd_seq_list_lock);
145                         /* The seq has not been added to the list */
146                         ofd_seq_put(env, new_seq);
147                         return os;
148                 }
149         }
150         atomic_inc(&new_seq->os_refc);
151         list_add_tail(&new_seq->os_list, &ofd->ofd_seq_list);
152         ofd->ofd_seq_count++;
153         write_unlock(&ofd->ofd_seq_list_lock);
154         return new_seq;
155 }
156
157 /**
158  * Get last object ID for the given sequence.
159  *
160  * \param[in] ofd_seq   OFD sequence structure
161  *
162  * \retval              the last object ID for this sequence
163  */
164 obd_id ofd_seq_last_oid(struct ofd_seq *oseq)
165 {
166         obd_id id;
167
168         spin_lock(&oseq->os_last_oid_lock);
169         id = ostid_id(&oseq->os_oi);
170         spin_unlock(&oseq->os_last_oid_lock);
171
172         return id;
173 }
174
175 /**
176  * Set new last object ID for the given sequence.
177  *
178  * \param[in] oseq      OFD sequence
179  * \param[in] id        the new OID to set
180  */
181 void ofd_seq_last_oid_set(struct ofd_seq *oseq, obd_id id)
182 {
183         spin_lock(&oseq->os_last_oid_lock);
184         if (likely(ostid_id(&oseq->os_oi) < id))
185                 ostid_set_id(&oseq->os_oi, id);
186         spin_unlock(&oseq->os_last_oid_lock);
187 }
188
189 /**
190  * Update last used OID on disk for the given sequence.
191  *
192  * The last used object ID is stored persistently on disk and
193  * must be written when updated. This function writes the sequence data.
194  * The format is just an object ID of the latest used object FID.
195  * Each ID is stored in per-sequence file.
196  *
197  * \param[in] env       execution environment
198  * \param[in] ofd       OFD device
199  * \param[in] oseq      ofd_seq structure with data to write
200  *
201  * \retval              0 on successful write of data from \a oseq
202  * \retval              negative value on error
203  */
204 int ofd_seq_last_oid_write(const struct lu_env *env, struct ofd_device *ofd,
205                            struct ofd_seq *oseq)
206 {
207         struct ofd_thread_info  *info = ofd_info(env);
208         obd_id                   tmp;
209         struct dt_object        *obj = oseq->os_lastid_obj;
210         struct thandle          *th;
211         int                      rc;
212
213         ENTRY;
214
215         tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
216
217         info->fti_buf.lb_buf = &tmp;
218         info->fti_buf.lb_len = sizeof(tmp);
219         info->fti_off = 0;
220
221         LASSERT(obj != NULL);
222
223         th = dt_trans_create(env, ofd->ofd_osd);
224         if (IS_ERR(th))
225                 RETURN(PTR_ERR(th));
226
227         rc = dt_declare_record_write(env, obj, &info->fti_buf,
228                                      info->fti_off, th);
229         if (rc < 0)
230                 GOTO(out, rc);
231         rc = dt_trans_start_local(env, ofd->ofd_osd, th);
232         if (rc < 0)
233                 GOTO(out, rc);
234         rc = dt_record_write(env, obj, &info->fti_buf, &info->fti_off,
235                              th);
236         if (rc < 0)
237                 GOTO(out, rc);
238
239         CDEBUG(D_INODE, "%s: write last_objid "DOSTID": rc = %d\n",
240                ofd_name(ofd), POSTID(&oseq->os_oi), rc);
241         EXIT;
242 out:
243         dt_trans_stop(env, ofd->ofd_osd, th);
244         return rc;
245 }
246
247 /**
248  * Deregister LWP items for FLDB and SEQ client on OFD.
249  *
250  * LWP is lightweight proxy - simplified connection between
251  * servers. It is used for FID Location Database (FLDB) and
252  * sequence (SEQ) client-server interations.
253  *
254  * This function is used during server cleanup process to free
255  * LWP items that were previously set up upon OFD start.
256  *
257  * \param[in]     ofd   OFD device
258  */
259 static void ofd_deregister_seq_exp(struct ofd_device *ofd)
260 {
261         struct seq_server_site  *ss = &ofd->ofd_seq_site;
262
263         if (ss->ss_client_seq != NULL) {
264                 lustre_deregister_lwp_item(&ss->ss_client_seq->lcs_exp);
265                 ss->ss_client_seq->lcs_exp = NULL;
266         }
267
268         if (ss->ss_server_fld != NULL) {
269                 lustre_deregister_lwp_item(&ss->ss_server_fld->lsf_control_exp);
270                 ss->ss_server_fld->lsf_control_exp = NULL;
271         }
272 }
273
274 /**
275  * Stop FLDB server on OFD.
276  *
277  * This function is part of OFD cleanup process.
278  *
279  * \param[in] env       execution environment
280  * \param[in] ofd       OFD device
281  *
282  */
283 static void ofd_fld_fini(const struct lu_env *env, struct ofd_device *ofd)
284 {
285         struct seq_server_site *ss = &ofd->ofd_seq_site;
286
287         if (ss != NULL && ss->ss_server_fld != NULL) {
288                 fld_server_fini(env, ss->ss_server_fld);
289                 OBD_FREE_PTR(ss->ss_server_fld);
290                 ss->ss_server_fld = NULL;
291         }
292 }
293
294 /**
295  * Free sequence structures on OFD.
296  *
297  * This function is part of OFD cleanup process, it goes through
298  * the list of ofd_seq structures stored in ofd_device structure
299  * and frees them.
300  *
301  * \param[in] env       execution environment
302  * \param[in] ofd       OFD device
303  */
304 void ofd_seqs_free(const struct lu_env *env, struct ofd_device *ofd)
305 {
306         struct ofd_seq          *oseq;
307         struct ofd_seq          *tmp;
308         struct list_head         dispose;
309
310         INIT_LIST_HEAD(&dispose);
311         write_lock(&ofd->ofd_seq_list_lock);
312         list_for_each_entry_safe(oseq, tmp, &ofd->ofd_seq_list, os_list)
313                 list_move(&oseq->os_list, &dispose);
314         write_unlock(&ofd->ofd_seq_list_lock);
315
316         while (!list_empty(&dispose)) {
317                 oseq = container_of0(dispose.next, struct ofd_seq, os_list);
318                 list_del_init(&oseq->os_list);
319                 ofd_seq_put(env, oseq);
320         }
321 }
322
323 /**
324  * Stop FLDB and SEQ services on OFD.
325  *
326  * This function is part of OFD cleanup process.
327  *
328  * \param[in] env       execution environment
329  * \param[in] ofd       OFD device
330  *
331  */
332 void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd)
333 {
334         int rc;
335
336         ofd_deregister_seq_exp(ofd);
337
338         rc = ofd_fid_fini(env, ofd);
339         if (rc != 0)
340                 CERROR("%s: fid fini error: rc = %d\n", ofd_name(ofd), rc);
341
342         ofd_fld_fini(env, ofd);
343
344         ofd_seqs_free(env, ofd);
345
346         LASSERT(list_empty(&ofd->ofd_seq_list));
347 }
348
349 /**
350  * Return ofd_seq structure filled with valid data.
351  *
352  * This function gets the ofd_seq by sequence number and read
353  * corresponding data from disk.
354  *
355  * \param[in] env       execution environment
356  * \param[in] ofd       OFD device
357  * \param[in] seq       sequence number
358  *
359  * \retval              ofd_seq structure filled with data
360  * \retval              ERR_PTR pointer on error
361  */
362 struct ofd_seq *ofd_seq_load(const struct lu_env *env, struct ofd_device *ofd,
363                              obd_seq seq)
364 {
365         struct ofd_thread_info  *info = ofd_info(env);
366         struct ofd_seq          *oseq = NULL;
367         struct dt_object        *dob;
368         obd_id                   lastid;
369         int                      rc;
370
371         ENTRY;
372
373         /* if seq is already initialized */
374         oseq = ofd_seq_get(ofd, seq);
375         if (oseq != NULL)
376                 RETURN(oseq);
377
378         OBD_ALLOC_PTR(oseq);
379         if (oseq == NULL)
380                 RETURN(ERR_PTR(-ENOMEM));
381
382         lu_last_id_fid(&info->fti_fid, seq, ofd->ofd_lut.lut_lsd.lsd_osd_index);
383         memset(&info->fti_attr, 0, sizeof(info->fti_attr));
384         info->fti_attr.la_valid = LA_MODE;
385         info->fti_attr.la_mode = S_IFREG |  S_IRUGO | S_IWUSR;
386         info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
387
388         /* create object tracking per-seq last created
389          * id to be used by orphan recovery mechanism */
390         dob = dt_find_or_create(env, ofd->ofd_osd, &info->fti_fid,
391                                 &info->fti_dof, &info->fti_attr);
392         if (IS_ERR(dob)) {
393                 OBD_FREE_PTR(oseq);
394                 RETURN((void *)dob);
395         }
396
397         oseq->os_lastid_obj = dob;
398
399         INIT_LIST_HEAD(&oseq->os_list);
400         mutex_init(&oseq->os_create_lock);
401         spin_lock_init(&oseq->os_last_oid_lock);
402         ostid_set_seq(&oseq->os_oi, seq);
403
404         atomic_set(&oseq->os_refc, 1);
405
406         rc = dt_attr_get(env, dob, &info->fti_attr, BYPASS_CAPA);
407         if (rc)
408                 GOTO(cleanup, rc);
409
410         if (info->fti_attr.la_size == 0) {
411                 /* object is just created, initialize last id */
412                 ofd_seq_last_oid_set(oseq, OFD_INIT_OBJID);
413                 ofd_seq_last_oid_write(env, ofd, oseq);
414         } else if (info->fti_attr.la_size == sizeof(lastid)) {
415                 info->fti_off = 0;
416                 info->fti_buf.lb_buf = &lastid;
417                 info->fti_buf.lb_len = sizeof(lastid);
418
419                 rc = dt_record_read(env, dob, &info->fti_buf, &info->fti_off);
420                 if (rc) {
421                         CERROR("%s: can't read last_id: rc = %d\n",
422                                 ofd_name(ofd), rc);
423                         GOTO(cleanup, rc);
424                 }
425                 ofd_seq_last_oid_set(oseq, le64_to_cpu(lastid));
426         } else {
427                 CERROR("%s: corrupted size "LPU64" LAST_ID of seq "LPX64"\n",
428                         ofd_name(ofd), (__u64)info->fti_attr.la_size, seq);
429                 GOTO(cleanup, rc = -EINVAL);
430         }
431
432         oseq = ofd_seq_add(env, ofd, oseq);
433         RETURN((oseq != NULL) ? oseq : ERR_PTR(-ENOENT));
434 cleanup:
435         ofd_seq_put(env, oseq);
436         return ERR_PTR(rc);
437 }
438
439 /**
440  * initialize local FLDB server.
441  *
442  * \param[in] env       execution environment
443  * \param[in] uuid      unique name for this FLDS server
444  * \param[in] ofd       OFD device
445  *
446  * \retval              0 on successful initialization
447  * \retval              negative value on error
448  */
449 static int ofd_fld_init(const struct lu_env *env, const char *uuid,
450                         struct ofd_device *ofd)
451 {
452         struct seq_server_site *ss = &ofd->ofd_seq_site;
453         int rc;
454
455         ENTRY;
456
457         OBD_ALLOC_PTR(ss->ss_server_fld);
458         if (ss->ss_server_fld == NULL)
459                 RETURN(rc = -ENOMEM);
460
461         rc = fld_server_init(env, ss->ss_server_fld, ofd->ofd_osd, uuid,
462                              LU_SEQ_RANGE_OST);
463         if (rc < 0) {
464                 OBD_FREE_PTR(ss->ss_server_fld);
465                 ss->ss_server_fld = NULL;
466                 RETURN(rc);
467         }
468         RETURN(0);
469 }
470
471 /**
472  * Update local FLDB copy from master server.
473  *
474  * This callback is called when LWP is connected to the server.
475  * It retrieves its FLDB entries from MDT0, and it only happens
476  * when upgrading the existing file system to 2.6.
477  *
478  * \param[in] data      OFD device
479  *
480  * \retval              0 on successful FLDB update
481  * \retval              negative value in case if failure
482  */
483 static int ofd_register_lwp_callback(void *data)
484 {
485         struct lu_env           *env;
486         struct ofd_device       *ofd = data;
487         struct lu_server_fld    *fld = ofd->ofd_seq_site.ss_server_fld;
488         int                     rc;
489
490         ENTRY;
491
492         if (!likely(fld->lsf_new))
493                 RETURN(0);
494
495         OBD_ALLOC_PTR(env);
496         if (env == NULL)
497                 RETURN(-ENOMEM);
498
499         rc = lu_env_init(env, LCT_DT_THREAD);
500         if (rc < 0)
501                 GOTO(out, rc);
502
503         rc = fld_update_from_controller(env, fld);
504         if (rc < 0) {
505                 CERROR("%s: cannot update controller: rc = %d\n",
506                        ofd_name(ofd), rc);
507                 GOTO(out, rc);
508         }
509         EXIT;
510 out:
511         lu_env_fini(env);
512         OBD_FREE_PTR(env);
513         return rc;
514 }
515
516 /**
517  * Get LWP exports from LWP connection for local FLDB server and SEQ client.
518  *
519  * This function is part of setup process and initialize FLDB server and SEQ
520  * client, so they may work with remote servers.
521  *
522  * \param[in] ofd       OFD device
523  *
524  * \retval              0 on successful export get
525  * \retval              negative value on error
526  */
527 static int ofd_register_seq_exp(struct ofd_device *ofd)
528 {
529         struct seq_server_site  *ss = &ofd->ofd_seq_site;
530         char                    *lwp_name = NULL;
531         int                     rc;
532
533         OBD_ALLOC(lwp_name, MAX_OBD_NAME);
534         if (lwp_name == NULL)
535                 GOTO(out_free, rc = -ENOMEM);
536
537         rc = tgt_name2lwp_name(ofd_name(ofd), lwp_name, MAX_OBD_NAME, 0);
538         if (rc != 0)
539                 GOTO(out_free, rc);
540
541         rc = lustre_register_lwp_item(lwp_name, &ss->ss_client_seq->lcs_exp,
542                                       NULL, NULL);
543         if (rc != 0)
544                 GOTO(out_free, rc);
545
546         rc = lustre_register_lwp_item(lwp_name,
547                                       &ss->ss_server_fld->lsf_control_exp,
548                                       ofd_register_lwp_callback, ofd);
549         if (rc != 0) {
550                 lustre_deregister_lwp_item(&ss->ss_client_seq->lcs_exp);
551                 ss->ss_client_seq->lcs_exp = NULL;
552                 GOTO(out_free, rc);
553         }
554 out_free:
555         if (lwp_name != NULL)
556                 OBD_FREE(lwp_name, MAX_OBD_NAME);
557
558         return rc;
559 }
560
561 /**
562  * Initialize SEQ and FLD service on OFD.
563  *
564  * This is part of OFD setup process.
565  *
566  * \param[in] env       execution environment
567  * \param[in] ofd       OFD device
568  *
569  * \retval              0 on successful services initialization
570  * \retval              negative value on error
571  */
572 int ofd_seqs_init(const struct lu_env *env, struct ofd_device *ofd)
573 {
574         int rc;
575
576         rc = ofd_fid_init(env, ofd);
577         if (rc != 0) {
578                 CERROR("%s: fid init error: rc = %d\n", ofd_name(ofd), rc);
579                 return rc;
580         }
581
582         rc = ofd_fld_init(env, ofd_name(ofd), ofd);
583         if (rc < 0) {
584                 CERROR("%s: Can't init fld, rc %d\n", ofd_name(ofd), rc);
585                 return rc;
586         }
587
588         rc = ofd_register_seq_exp(ofd);
589         if (rc < 0) {
590                 CERROR("%s: Can't init seq exp, rc %d\n", ofd_name(ofd), rc);
591                 return rc;
592         }
593
594         rwlock_init(&ofd->ofd_seq_list_lock);
595         INIT_LIST_HEAD(&ofd->ofd_seq_list);
596         ofd->ofd_seq_count = 0;
597         return rc;
598 }
599
600 /**
601  * Initialize storage for the OFD.
602  *
603  * This function sets up service files for OFD. Currently, the only
604  * service file is "health_check".
605  *
606  * \param[in] env       execution environment
607  * \param[in] ofd       OFD device
608  * \param[in] obd       OBD device (unused now)
609  *
610  * \retval              0 on successful setup
611  * \retval              negative value on error
612  */
613 int ofd_fs_setup(const struct lu_env *env, struct ofd_device *ofd,
614                  struct obd_device *obd)
615 {
616         struct ofd_thread_info  *info = ofd_info(env);
617         struct dt_object        *fo;
618         int                      rc = 0;
619
620         ENTRY;
621
622         rc = ofd_seqs_init(env, ofd);
623         if (rc)
624                 GOTO(out_hc, rc);
625
626         if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FS_SETUP))
627                 RETURN (-ENOENT);
628
629         lu_local_obj_fid(&info->fti_fid, OFD_HEALTH_CHECK_OID);
630         memset(&info->fti_attr, 0, sizeof(info->fti_attr));
631         info->fti_attr.la_valid = LA_MODE;
632         info->fti_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
633         info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
634
635         fo = dt_find_or_create(env, ofd->ofd_osd, &info->fti_fid,
636                                &info->fti_dof, &info->fti_attr);
637         if (IS_ERR(fo))
638                 GOTO(out, rc = PTR_ERR(fo));
639
640         ofd->ofd_health_check_file = fo;
641
642         RETURN(0);
643 out_hc:
644         lu_object_put(env, &ofd->ofd_health_check_file->do_lu);
645 out:
646         return rc;
647 }
648
649 /**
650  * Cleanup service files on OFD.
651  *
652  * This function syncs whole OFD device and close "health check" file.
653  *
654  * \param[in] env       execution environment
655  * \param[in] ofd       OFD device
656  */
657 void ofd_fs_cleanup(const struct lu_env *env, struct ofd_device *ofd)
658 {
659         int rc;
660
661         ENTRY;
662
663         ofd_seqs_fini(env, ofd);
664
665         rc = dt_sync(env, ofd->ofd_osd);
666         if (rc < 0)
667                 CWARN("%s: can't sync OFD upon cleanup: %d\n",
668                       ofd_name(ofd), rc);
669
670         if (ofd->ofd_health_check_file) {
671                 lu_object_put(env, &ofd->ofd_health_check_file->do_lu);
672                 ofd->ofd_health_check_file = NULL;
673         }
674
675         EXIT;
676 }
677