4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.gnu.org/licenses/gpl-2.0.html
23 * Copyright (c) 2009, 2010, Oracle and/or its affiliates. All rights reserved.
24 * Use is subject to license terms.
26 * Copyright (c) 2012, 2017, Intel Corporation.
29 * This file is part of Lustre, http://www.lustre.org/
33 * This file provides helper functions to handle various data stored on disk.
34 * It uses OSD API and works with any OSD.
36 * Note: this file contains also functions for sequence handling, they are
37 * placed here improperly and will be moved to the ofd_dev.c and ofd_internal.h,
38 * this comment is to be removed after that.
40 * Author: Alexey Zhuravlev <alexey.zhuravlev@intel.com>
41 * Author: Mikhail Pershin <mike.pershin@intel.com>
44 #define DEBUG_SUBSYSTEM S_FILTER
46 #include "ofd_internal.h"
49 * Restrict precreate batch count by its upper limit.
51 * The precreate batch count is a number of precreates to do in
52 * single transaction. It has upper limit - ofd_device::ofd_precreate_batch
53 * value which shouldn't be exceeded.
55 * \param[in] ofd OFD device
56 * \param[in] batch number of updates in the batch
58 * \retval \a batch limited by ofd_device::ofd_precreate_batch
60 int ofd_precreate_batch(struct ofd_device *ofd, int batch)
64 spin_lock(&ofd->ofd_batch_lock);
65 count = min(ofd->ofd_precreate_batch, batch);
66 spin_unlock(&ofd->ofd_batch_lock);
72 * Get ofd_seq for \a seq.
74 * Function finds appropriate structure by \a seq number and
75 * increases the reference counter of that structure.
77 * \param[in] ofd OFD device
78 * \param[in] seq sequence number, FID sequence number usually
80 * \retval pointer to the requested ofd_seq structure
81 * \retval NULL if ofd_seq is not found
83 struct ofd_seq *ofd_seq_get(struct ofd_device *ofd, u64 seq)
87 read_lock(&ofd->ofd_seq_list_lock);
88 list_for_each_entry(oseq, &ofd->ofd_seq_list, os_list) {
89 if (ostid_seq(&oseq->os_oi) == seq) {
90 atomic_inc(&oseq->os_refc);
91 read_unlock(&ofd->ofd_seq_list_lock);
95 read_unlock(&ofd->ofd_seq_list_lock);
100 * Drop a reference to ofd_seq.
102 * The paired function to the ofd_seq_get(). It decrease the reference counter
103 * of the ofd_seq structure and free it if that reference was last one.
105 * \param[in] env execution environment
106 * \param[in] oseq ofd_seq structure to put
108 void ofd_seq_put(const struct lu_env *env, struct ofd_seq *oseq)
110 if (atomic_dec_and_test(&oseq->os_refc)) {
111 LASSERT(list_empty(&oseq->os_list));
112 LASSERT(oseq->os_lastid_obj != NULL);
113 dt_object_put(env, oseq->os_lastid_obj);
119 * Add a new ofd_seq to the given OFD device.
121 * First it checks if there is already existent ofd_seq with the same
122 * sequence number as used by \a new_seq.
123 * If such ofd_seq is not found then the \a new_seq is added to the list
124 * of all ofd_seq structures else the \a new_seq is dropped and the found
125 * ofd_seq is returned back.
127 * \param[in] env execution environment
128 * \param[in] ofd OFD device
129 * \param[in] new_seq new ofd_seq to be added
131 * \retval ofd_seq structure
133 static struct ofd_seq *ofd_seq_add(const struct lu_env *env,
134 struct ofd_device *ofd,
135 struct ofd_seq *new_seq)
137 struct ofd_seq *os = NULL;
139 write_lock(&ofd->ofd_seq_list_lock);
140 list_for_each_entry(os, &ofd->ofd_seq_list, os_list) {
141 if (ostid_seq(&os->os_oi) == ostid_seq(&new_seq->os_oi)) {
142 atomic_inc(&os->os_refc);
143 write_unlock(&ofd->ofd_seq_list_lock);
144 /* The seq has not been added to the list */
145 ofd_seq_put(env, new_seq);
149 atomic_inc(&new_seq->os_refc);
150 list_add_tail(&new_seq->os_list, &ofd->ofd_seq_list);
151 ofd->ofd_seq_count++;
152 write_unlock(&ofd->ofd_seq_list_lock);
157 * Get last object ID for the given sequence.
159 * \param[in] oseq OFD sequence structure
161 * \retval the last object ID for this sequence
163 u64 ofd_seq_last_oid(struct ofd_seq *oseq)
167 spin_lock(&oseq->os_last_oid_lock);
168 id = ostid_id(&oseq->os_oi);
169 spin_unlock(&oseq->os_last_oid_lock);
175 * Set new last object ID for the given sequence.
177 * \param[in] oseq OFD sequence
178 * \param[in] id the new OID to set
180 void ofd_seq_last_oid_set(struct ofd_seq *oseq, u64 id)
182 spin_lock(&oseq->os_last_oid_lock);
183 if (likely(ostid_id(&oseq->os_oi) < id)) {
184 if (ostid_set_id(&oseq->os_oi, id)) {
185 CERROR("Bad %llu to set " DOSTID "\n",
186 (unsigned long long)id, POSTID(&oseq->os_oi));
189 spin_unlock(&oseq->os_last_oid_lock);
193 * Update last used OID on disk for the given sequence.
195 * The last used object ID is stored persistently on disk and
196 * must be written when updated. This function writes the sequence data.
197 * The format is just an object ID of the latest used object FID.
198 * Each ID is stored in per-sequence file.
200 * \param[in] env execution environment
201 * \param[in] ofd OFD device
202 * \param[in] oseq ofd_seq structure with data to write
204 * \retval 0 on successful write of data from \a oseq
205 * \retval negative value on error
207 int ofd_seq_last_oid_write(const struct lu_env *env, struct ofd_device *ofd,
208 struct ofd_seq *oseq)
210 struct ofd_thread_info *info = ofd_info(env);
212 struct dt_object *obj = oseq->os_lastid_obj;
218 if (ofd->ofd_osd->dd_rdonly)
221 tmp = cpu_to_le64(ofd_seq_last_oid(oseq));
223 info->fti_buf.lb_buf = &tmp;
224 info->fti_buf.lb_len = sizeof(tmp);
227 LASSERT(obj != NULL);
229 th = dt_trans_create(env, ofd->ofd_osd);
233 rc = dt_declare_record_write(env, obj, &info->fti_buf,
237 rc = dt_trans_start_local(env, ofd->ofd_osd, th);
240 rc = dt_record_write(env, obj, &info->fti_buf, &info->fti_off,
245 CDEBUG(D_INODE, "%s: write last_objid "DOSTID": rc = %d\n",
246 ofd_name(ofd), POSTID(&oseq->os_oi), rc);
249 dt_trans_stop(env, ofd->ofd_osd, th);
254 * Deregister LWP items for FLDB and SEQ client on OFD.
256 * LWP is lightweight proxy - simplified connection between
257 * servers. It is used for FID Location Database (FLDB) and
258 * sequence (SEQ) client-server interactions.
260 * This function is used during server cleanup process to free
261 * LWP items that were previously set up upon OFD start.
263 * \param[in] ofd OFD device
265 static void ofd_deregister_seq_exp(struct ofd_device *ofd)
267 struct seq_server_site *ss = &ofd->ofd_seq_site;
269 if (ss->ss_client_seq != NULL) {
270 lustre_deregister_lwp_item(&ss->ss_client_seq->lcs_exp);
271 ss->ss_client_seq->lcs_exp = NULL;
274 if (ss->ss_server_fld != NULL) {
275 lustre_deregister_lwp_item(&ss->ss_server_fld->lsf_control_exp);
276 ss->ss_server_fld->lsf_control_exp = NULL;
281 * Stop FLDB server on OFD.
283 * This function is part of OFD cleanup process.
285 * \param[in] env execution environment
286 * \param[in] ofd OFD device
289 static void ofd_fld_fini(const struct lu_env *env, struct ofd_device *ofd)
291 struct seq_server_site *ss = &ofd->ofd_seq_site;
293 if (ss != NULL && ss->ss_server_fld != NULL) {
294 fld_server_fini(env, ss->ss_server_fld);
295 OBD_FREE_PTR(ss->ss_server_fld);
296 ss->ss_server_fld = NULL;
301 * Free sequence structures on OFD.
303 * This function is part of OFD cleanup process, it goes through
304 * the list of ofd_seq structures stored in ofd_device structure
307 * \param[in] env execution environment
308 * \param[in] ofd OFD device
310 void ofd_seqs_free(const struct lu_env *env, struct ofd_device *ofd)
312 struct ofd_seq *oseq;
316 write_lock(&ofd->ofd_seq_list_lock);
317 list_for_each_entry_safe(oseq, tmp, &ofd->ofd_seq_list, os_list)
318 list_move(&oseq->os_list, &dispose);
319 write_unlock(&ofd->ofd_seq_list_lock);
321 while (!list_empty(&dispose)) {
322 oseq = container_of(dispose.next, struct ofd_seq, os_list);
323 list_del_init(&oseq->os_list);
324 ofd_seq_put(env, oseq);
329 * Stop FLDB and SEQ services on OFD.
331 * This function is part of OFD cleanup process.
333 * \param[in] env execution environment
334 * \param[in] ofd OFD device
337 void ofd_seqs_fini(const struct lu_env *env, struct ofd_device *ofd)
341 ofd_deregister_seq_exp(ofd);
343 rc = ofd_fid_fini(env, ofd);
345 CERROR("%s: fid fini error: rc = %d\n", ofd_name(ofd), rc);
347 ofd_fld_fini(env, ofd);
349 ofd_seqs_free(env, ofd);
351 LASSERT(list_empty(&ofd->ofd_seq_list));
355 * Return ofd_seq structure filled with valid data.
357 * This function gets the ofd_seq by sequence number and read
358 * corresponding data from disk.
360 * \param[in] env execution environment
361 * \param[in] ofd OFD device
362 * \param[in] seq sequence number
364 * \retval ofd_seq structure filled with data
365 * \retval ERR_PTR pointer on error
367 struct ofd_seq *ofd_seq_load(const struct lu_env *env, struct ofd_device *ofd,
370 struct ofd_thread_info *info = ofd_info(env);
371 struct ofd_seq *oseq = NULL;
372 struct dt_object *dob;
378 /* if seq is already initialized */
379 oseq = ofd_seq_get(ofd, seq);
381 CDEBUG(D_TRACE, "%s: got sequence %#llx "DOSTID"\n",
382 ofd_name(ofd), seq, POSTID(&oseq->os_oi));
388 RETURN(ERR_PTR(-ENOMEM));
390 lu_last_id_fid(&info->fti_fid, seq, ofd->ofd_lut.lut_lsd.lsd_osd_index);
391 memset(&info->fti_attr, 0, sizeof(info->fti_attr));
392 info->fti_attr.la_valid = LA_MODE;
393 info->fti_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
394 info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
396 /* create object tracking per-seq last created
397 * id to be used by orphan recovery mechanism */
398 dob = dt_find_or_create(env, ofd->ofd_osd, &info->fti_fid,
399 &info->fti_dof, &info->fti_attr);
405 oseq->os_lastid_obj = dob;
407 INIT_LIST_HEAD(&oseq->os_list);
408 mutex_init(&oseq->os_create_lock);
409 spin_lock_init(&oseq->os_last_oid_lock);
410 ostid_set_seq(&oseq->os_oi, seq);
411 oseq->os_last_id_synced = 0;
413 atomic_set(&oseq->os_refc, 1);
414 atomic_set(&oseq->os_precreate_in_progress, 0);
416 rc = dt_attr_get(env, dob, &info->fti_attr);
420 if (info->fti_attr.la_size == 0) {
421 /* object is just created, initialize last id */
422 if (OBD_FAIL_CHECK(OBD_FAIL_OFD_SET_OID)) {
423 struct seq_server_site *ss = &ofd->ofd_seq_site;
424 struct lu_client_seq *client_seq = ss->ss_client_seq;
425 __u64 seq_width = fid_seq_is_norm(seq) ?
426 min(OBIF_MAX_OID, client_seq->lcs_width) :
427 min(IDIF_MAX_OID, client_seq->lcs_width);
429 ofd_seq_last_oid_set(oseq, seq_width & ~0xffULL);
431 ofd_seq_last_oid_set(oseq, OFD_INIT_OBJID);
433 ofd_seq_last_oid_write(env, ofd, oseq);
434 } else if (info->fti_attr.la_size == sizeof(lastid)) {
436 info->fti_buf.lb_buf = &lastid;
437 info->fti_buf.lb_len = sizeof(lastid);
439 rc = dt_record_read(env, dob, &info->fti_buf, &info->fti_off);
441 CERROR("%s: can't read last_id: rc = %d\n",
445 ofd_seq_last_oid_set(oseq, le64_to_cpu(lastid));
447 CERROR("%s: corrupted size %llu LAST_ID of seq %#llx\n",
448 ofd_name(ofd), (__u64)info->fti_attr.la_size, seq);
449 GOTO(cleanup, rc = -EINVAL);
452 CDEBUG(D_HA, "%s: adding sequence %#llx\n", ofd_name(ofd), seq);
454 oseq = ofd_seq_add(env, ofd, oseq);
455 RETURN((oseq != NULL) ? oseq : ERR_PTR(-ENOENT));
457 ofd_seq_put(env, oseq);
462 * initialize local FLDB server.
464 * \param[in] env execution environment
465 * \param[in] uuid unique name for this FLDS server
466 * \param[in] ofd OFD device
468 * \retval 0 on successful initialization
469 * \retval negative value on error
471 static int ofd_fld_init(const struct lu_env *env, const char *uuid,
472 struct ofd_device *ofd)
474 struct seq_server_site *ss = &ofd->ofd_seq_site;
479 OBD_ALLOC_PTR(ss->ss_server_fld);
480 if (ss->ss_server_fld == NULL)
481 RETURN(rc = -ENOMEM);
483 rc = fld_server_init(env, ss->ss_server_fld, ofd->ofd_osd, uuid,
486 OBD_FREE_PTR(ss->ss_server_fld);
487 ss->ss_server_fld = NULL;
494 * Update local FLDB copy from master server.
496 * This callback is called when LWP is connected to the server.
497 * It retrieves its FLDB entries from MDT0, and it only happens
498 * when upgrading the existing file system to 2.6.
500 * \param[in] data OFD device
502 * \retval 0 on successful FLDB update
503 * \retval negative value in case if failure
505 static int ofd_register_lwp_callback(void *data)
508 struct ofd_device *ofd = data;
509 struct lu_server_fld *fld = ofd->ofd_seq_site.ss_server_fld;
514 if (!likely(fld->lsf_new))
521 rc = lu_env_init(env, LCT_DT_THREAD);
525 rc = fld_update_from_controller(env, fld);
527 CERROR("%s: cannot update controller: rc = %d\n",
539 * Get LWP exports from LWP connection for local FLDB server and SEQ client.
541 * This function is part of setup process and initialize FLDB server and SEQ
542 * client, so they may work with remote servers.
544 * \param[in] ofd OFD device
546 * \retval 0 on successful export get
547 * \retval negative value on error
549 static int ofd_register_seq_exp(struct ofd_device *ofd)
551 struct seq_server_site *ss = &ofd->ofd_seq_site;
552 char *lwp_name = NULL;
555 OBD_ALLOC(lwp_name, MAX_OBD_NAME);
556 if (lwp_name == NULL)
557 GOTO(out_free, rc = -ENOMEM);
559 rc = tgt_name2lwp_name(ofd_name(ofd), lwp_name, MAX_OBD_NAME, 0);
563 rc = lustre_register_lwp_item(lwp_name, &ss->ss_client_seq->lcs_exp,
568 rc = lustre_register_lwp_item(lwp_name,
569 &ss->ss_server_fld->lsf_control_exp,
570 ofd_register_lwp_callback, ofd);
572 lustre_deregister_lwp_item(&ss->ss_client_seq->lcs_exp);
573 ss->ss_client_seq->lcs_exp = NULL;
577 if (lwp_name != NULL)
578 OBD_FREE(lwp_name, MAX_OBD_NAME);
584 * Initialize SEQ and FLD service on OFD.
586 * This is part of OFD setup process.
588 * \param[in] env execution environment
589 * \param[in] ofd OFD device
591 * \retval 0 on successful services initialization
592 * \retval negative value on error
594 int ofd_seqs_init(const struct lu_env *env, struct ofd_device *ofd)
598 rwlock_init(&ofd->ofd_seq_list_lock);
599 INIT_LIST_HEAD(&ofd->ofd_seq_list);
600 ofd->ofd_seq_count = 0;
602 rc = ofd_fid_init(env, ofd);
604 CERROR("%s: fid init error: rc = %d\n", ofd_name(ofd), rc);
608 rc = ofd_fld_init(env, ofd_name(ofd), ofd);
610 CERROR("%s: Can't init fld, rc %d\n", ofd_name(ofd), rc);
614 rc = ofd_register_seq_exp(ofd);
616 CERROR("%s: Can't init seq exp, rc %d\n", ofd_name(ofd), rc);
623 ofd_fld_fini(env, ofd);
625 ofd_fid_fini(env, ofd);
631 * Initialize storage for the OFD.
633 * This function sets up service files for OFD. Currently, the only
634 * service file is "health_check".
636 * \param[in] env execution environment
637 * \param[in] ofd OFD device
638 * \param[in] obd OBD device (unused now)
640 * \retval 0 on successful setup
641 * \retval negative value on error
643 int ofd_fs_setup(const struct lu_env *env, struct ofd_device *ofd,
644 struct obd_device *obd)
646 struct ofd_thread_info *info = ofd_info(env);
647 struct dt_object *fo;
652 rc = ofd_seqs_init(env, ofd);
656 if (OBD_FAIL_CHECK(OBD_FAIL_MDS_FS_SETUP))
657 GOTO(out_seqs, rc = -ENOENT);
659 lu_local_obj_fid(&info->fti_fid, OFD_HEALTH_CHECK_OID);
660 memset(&info->fti_attr, 0, sizeof(info->fti_attr));
661 info->fti_attr.la_valid = LA_MODE;
662 info->fti_attr.la_mode = S_IFREG | S_IRUGO | S_IWUSR;
663 info->fti_dof.dof_type = dt_mode_to_dft(S_IFREG);
665 fo = dt_find_or_create(env, ofd->ofd_osd, &info->fti_fid,
666 &info->fti_dof, &info->fti_attr);
668 GOTO(out_seqs, rc = PTR_ERR(fo));
670 ofd->ofd_health_check_file = fo;
675 ofd_seqs_fini(env, ofd);
681 * Cleanup service files on OFD.
683 * This function syncs whole OFD device and close "health check" file.
685 * \param[in] env execution environment
686 * \param[in] ofd OFD device
688 void ofd_fs_cleanup(const struct lu_env *env, struct ofd_device *ofd)
694 ofd_seqs_fini(env, ofd);
696 rc = dt_sync(env, ofd->ofd_osd);
698 CWARN("%s: can't sync OFD upon cleanup: %d\n",
701 if (ofd->ofd_health_check_file) {
702 dt_object_put(env, ofd->ofd_health_check_file);
703 ofd->ofd_health_check_file = NULL;