Whamcloud - gitweb
LU-16524 nodemap: add rbac property to nodemap
[fs/lustre-release.git] / lustre / ptlrpc / nodemap_storage.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2015, Trustees of Indiana University
24  *
25  * Copyright (c) 2017, Intel Corporation.
26  *
27  * Author: Joshua Walgenbach <jjw@iu.edu>
28  * Author: Kit Westneat <cwestnea@iu.edu>
29  *
30  * Implements the storage functionality for the nodemap configuration. Functions
31  * in this file prepare, store, and load nodemap configuration data. Targets
32  * using nodemap services should register a configuration file object. Nodemap
33  * configuration changes that need to persist should call the appropriate
34  * storage function for the data being modified.
35  *
36  * There are several index types as defined in enum nodemap_idx_type:
37  *      NODEMAP_CLUSTER_IDX     stores the data found on the lu_nodemap struct,
38  *                              like root squash and config flags, as well as
39  *                              the name.
40  *      NODEMAP_RANGE_IDX       stores NID range information for a nodemap
41  *      NODEMAP_UIDMAP_IDX      stores a fs/client UID mapping pair
42  *      NODEMAP_GIDMAP_IDX      stores a fs/client GID mapping pair
43  *      NODEMAP_GLOBAL_IDX      stores whether or not nodemaps are active
44  */
45
46 #include <libcfs/libcfs.h>
47 #include <linux/err.h>
48 #include <linux/kernel.h>
49 #include <linux/list.h>
50 #include <linux/mutex.h>
51 #include <linux/string.h>
52 #include <linux/types.h>
53 #include <uapi/linux/lnet/lnet-types.h>
54 #include <uapi/linux/lustre/lustre_idl.h>
55 #include <dt_object.h>
56 #include <lu_object.h>
57 #include <lustre_net.h>
58 #include <lustre_nodemap.h>
59 #include <obd_class.h>
60 #include <obd_support.h>
61 #include "nodemap_internal.h"
62
63 /* list of registered nodemap index files, except MGS */
64 static LIST_HEAD(ncf_list_head);
65 static DEFINE_MUTEX(ncf_list_lock);
66
67 /* MGS index is different than others, others are listeners to MGS idx */
68 static struct nm_config_file *nodemap_mgs_ncf;
69
70 static void nodemap_cluster_key_init(struct nodemap_key *nk, unsigned int nm_id,
71                                      enum nodemap_cluster_rec_subid subid)
72 {
73         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id,
74                                                         NODEMAP_CLUSTER_IDX));
75         nk->nk_cluster_subid = subid;
76 }
77
78 static void nodemap_cluster_rec_init(union nodemap_rec *nr,
79                                      const struct lu_nodemap *nodemap)
80 {
81         BUILD_BUG_ON(sizeof(nr->ncr.ncr_name) != sizeof(nodemap->nm_name));
82
83         strncpy(nr->ncr.ncr_name, nodemap->nm_name, sizeof(nr->ncr.ncr_name));
84         nr->ncr.ncr_squash_uid = cpu_to_le32(nodemap->nm_squash_uid);
85         nr->ncr.ncr_squash_gid = cpu_to_le32(nodemap->nm_squash_gid);
86         nr->ncr.ncr_squash_projid = cpu_to_le32(nodemap->nm_squash_projid);
87         nr->ncr.ncr_flags =
88                 (nodemap->nmf_trust_client_ids ?
89                         NM_FL_TRUST_CLIENT_IDS : 0) |
90                 (nodemap->nmf_allow_root_access ?
91                         NM_FL_ALLOW_ROOT_ACCESS : 0) |
92                 (nodemap->nmf_deny_unknown ?
93                         NM_FL_DENY_UNKNOWN : 0) |
94                 (nodemap->nmf_map_mode & NODEMAP_MAP_UID ?
95                         NM_FL_MAP_UID : 0) |
96                 (nodemap->nmf_map_mode & NODEMAP_MAP_GID ?
97                         NM_FL_MAP_GID : 0) |
98                 (nodemap->nmf_map_mode & NODEMAP_MAP_PROJID ?
99                         NM_FL_MAP_PROJID : 0) |
100                 (nodemap->nmf_enable_audit ?
101                         NM_FL_ENABLE_AUDIT : 0) |
102                 (nodemap->nmf_forbid_encryption ?
103                         NM_FL_FORBID_ENCRYPT : 0);
104         nr->ncr.ncr_flags2 =
105                 (nodemap->nmf_readonly_mount ?
106                         NM_FL2_READONLY_MOUNT : 0);
107 }
108
109 static void nodemap_cluster_roles_rec_init(union nodemap_rec *nr,
110                                            const struct lu_nodemap *nodemap)
111 {
112         struct nodemap_cluster_roles_rec *ncrr = &nr->ncrr;
113
114         memset(ncrr, 0, sizeof(struct nodemap_cluster_roles_rec));
115         ncrr->ncrr_roles = cpu_to_le64(nodemap->nmf_rbac);
116 }
117
118 static void nodemap_idmap_key_init(struct nodemap_key *nk, unsigned int nm_id,
119                                    enum nodemap_id_type id_type,
120                                    u32 id_client)
121 {
122         enum nodemap_idx_type idx_type;
123
124         if (id_type == NODEMAP_UID)
125                 idx_type = NODEMAP_UIDMAP_IDX;
126         else if (id_type == NODEMAP_GID)
127                 idx_type = NODEMAP_GIDMAP_IDX;
128         else if (id_type == NODEMAP_PROJID)
129                 idx_type = NODEMAP_PROJIDMAP_IDX;
130         else
131                 idx_type = NODEMAP_EMPTY_IDX;
132
133         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id, idx_type));
134         nk->nk_id_client = cpu_to_le32(id_client);
135 }
136
137 static void nodemap_idmap_rec_init(union nodemap_rec *nr, u32 id_fs)
138 {
139         nr->nir.nir_id_fs = cpu_to_le32(id_fs);
140 }
141
142 static void nodemap_range_key_init(struct nodemap_key *nk, unsigned int nm_id,
143                                    unsigned int rn_id)
144 {
145         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id,
146                                                         NODEMAP_RANGE_IDX));
147         nk->nk_range_id = cpu_to_le32(rn_id);
148 }
149
150 static void nodemap_range_rec_init(union nodemap_rec *nr,
151                                    const lnet_nid_t nid[2])
152 {
153         nr->nrr.nrr_start_nid = cpu_to_le64(nid[0]);
154         nr->nrr.nrr_end_nid = cpu_to_le64(nid[1]);
155 }
156
157 static void nodemap_global_key_init(struct nodemap_key *nk)
158 {
159         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(0, NODEMAP_GLOBAL_IDX));
160         nk->nk_unused = 0;
161 }
162
163 static void nodemap_global_rec_init(union nodemap_rec *nr, bool active)
164 {
165         nr->ngr.ngr_is_active = active;
166 }
167
168 /* should be called with dt_write lock */
169 static void nodemap_inc_version(const struct lu_env *env,
170                                 struct dt_object *nodemap_idx,
171                                 struct thandle *th)
172 {
173         u64 ver = dt_version_get(env, nodemap_idx);
174         dt_version_set(env, nodemap_idx, ver + 1, th);
175 }
176
177 enum ncfc_find_create {
178         NCFC_CREATE_NEW = 1,
179 };
180
181 static struct dt_object *nodemap_cache_find_create(const struct lu_env *env,
182                                                    struct dt_device *dev,
183                                                    struct local_oid_storage *los,
184                                                    enum ncfc_find_create create_new)
185 {
186         struct lu_fid tfid;
187         struct dt_object *root_obj;
188         struct dt_object *nm_obj;
189         int rc = 0;
190
191         rc = dt_root_get(env, dev, &tfid);
192         if (rc < 0)
193                 GOTO(out, nm_obj = ERR_PTR(rc));
194
195         root_obj = dt_locate(env, dev, &tfid);
196         if (unlikely(IS_ERR(root_obj)))
197                 GOTO(out, nm_obj = root_obj);
198
199         rc = dt_lookup_dir(env, root_obj, LUSTRE_NODEMAP_NAME, &tfid);
200         if (rc == -ENOENT) {
201                 if (dev->dd_rdonly)
202                         GOTO(out_root, nm_obj = ERR_PTR(-EROFS));
203         } else if (rc) {
204                 GOTO(out_root, nm_obj = ERR_PTR(rc));
205         } else if (dev->dd_rdonly && create_new == NCFC_CREATE_NEW) {
206                 GOTO(out_root, nm_obj = ERR_PTR(-EROFS));
207         }
208
209 again:
210         /* if loading index fails the first time, create new index */
211         if (create_new == NCFC_CREATE_NEW && rc != -ENOENT) {
212                 CDEBUG(D_INFO, "removing old index, creating new one\n");
213                 rc = local_object_unlink(env, dev, root_obj,
214                                          LUSTRE_NODEMAP_NAME);
215                 if (rc < 0) {
216                         /* XXX not sure the best way to get obd name. */
217                         CERROR("cannot destroy nodemap index: rc = %d\n",
218                                rc);
219                         GOTO(out_root, nm_obj = ERR_PTR(rc));
220                 }
221         }
222
223         nm_obj = local_index_find_or_create(env, los, root_obj,
224                                                 LUSTRE_NODEMAP_NAME,
225                                                 S_IFREG | S_IRUGO | S_IWUSR,
226                                                 &dt_nodemap_features);
227         if (IS_ERR(nm_obj))
228                 GOTO(out_root, nm_obj);
229
230         if (nm_obj->do_index_ops == NULL) {
231                 rc = nm_obj->do_ops->do_index_try(env, nm_obj,
232                                                       &dt_nodemap_features);
233                 /* even if loading from tgt fails, connecting to MGS will
234                  * rewrite the config
235                  */
236                 if (rc < 0) {
237                         dt_object_put(env, nm_obj);
238
239                         if (create_new == NCFC_CREATE_NEW)
240                                 GOTO(out_root, nm_obj = ERR_PTR(rc));
241
242                         CERROR("cannot load nodemap index from disk, creating "
243                                "new index: rc = %d\n", rc);
244                         create_new = NCFC_CREATE_NEW;
245                         goto again;
246                 }
247         }
248
249 out_root:
250         dt_object_put(env, root_obj);
251 out:
252         return nm_obj;
253 }
254
255 static int nodemap_idx_insert(const struct lu_env *env,
256                               struct dt_object *idx,
257                               const struct nodemap_key *nk,
258                               const union nodemap_rec *nr)
259 {
260         struct thandle *th;
261         struct dt_device *dev = lu2dt_dev(idx->do_lu.lo_dev);
262         int rc;
263
264         BUILD_BUG_ON(sizeof(union nodemap_rec) != 32);
265
266         th = dt_trans_create(env, dev);
267
268         if (IS_ERR(th))
269                 GOTO(out, rc = PTR_ERR(th));
270
271         rc = dt_declare_insert(env, idx,
272                                (const struct dt_rec *)nr,
273                                (const struct dt_key *)nk, th);
274         if (rc != 0)
275                 GOTO(out, rc);
276
277         rc = dt_declare_version_set(env, idx, th);
278         if (rc != 0)
279                 GOTO(out, rc);
280
281         rc = dt_trans_start_local(env, dev, th);
282         if (rc != 0)
283                 GOTO(out, rc);
284
285         dt_write_lock(env, idx, 0);
286
287         rc = dt_insert(env, idx, (const struct dt_rec *)nr,
288                        (const struct dt_key *)nk, th);
289
290         nodemap_inc_version(env, idx, th);
291         dt_write_unlock(env, idx);
292 out:
293         dt_trans_stop(env, dev, th);
294
295         return rc;
296 }
297
298 static int nodemap_idx_update(const struct lu_env *env,
299                               struct dt_object *idx,
300                               const struct nodemap_key *nk,
301                               const union nodemap_rec *nr)
302 {
303         struct thandle          *th;
304         struct dt_device        *dev = lu2dt_dev(idx->do_lu.lo_dev);
305         int                      rc = 0;
306
307         th = dt_trans_create(env, dev);
308
309         if (IS_ERR(th))
310                 GOTO(out, rc = PTR_ERR(th));
311
312         rc = dt_declare_delete(env, idx, (const struct dt_key *)nk, th);
313         if (rc != 0)
314                 GOTO(out, rc);
315
316         rc = dt_declare_insert(env, idx, (const struct dt_rec *)nr,
317                                (const struct dt_key *)nk, th);
318         if (rc != 0)
319                 GOTO(out, rc);
320
321         rc = dt_declare_version_set(env, idx, th);
322         if (rc != 0)
323                 GOTO(out, rc);
324
325         rc = dt_trans_start_local(env, dev, th);
326         if (rc != 0)
327                 GOTO(out, rc);
328
329         dt_write_lock(env, idx, 0);
330
331         rc = dt_delete(env, idx, (const struct dt_key *)nk, th);
332         if (rc != 0)
333                 GOTO(out_lock, rc);
334
335         rc = dt_insert(env, idx, (const struct dt_rec *)nr,
336                        (const struct dt_key *)nk, th);
337         if (rc != 0)
338                 GOTO(out_lock, rc);
339
340         nodemap_inc_version(env, idx, th);
341 out_lock:
342         dt_write_unlock(env, idx);
343 out:
344         dt_trans_stop(env, dev, th);
345
346         return rc;
347 }
348
349 static int nodemap_idx_delete(const struct lu_env *env,
350                               struct dt_object *idx,
351                               const struct nodemap_key *nk,
352                               const union nodemap_rec *unused)
353 {
354         struct thandle          *th;
355         struct dt_device        *dev = lu2dt_dev(idx->do_lu.lo_dev);
356         int                      rc = 0;
357
358         th = dt_trans_create(env, dev);
359
360         if (IS_ERR(th))
361                 GOTO(out, rc = PTR_ERR(th));
362
363         rc = dt_declare_delete(env, idx, (const struct dt_key *)nk, th);
364         if (rc != 0)
365                 GOTO(out, rc);
366
367         rc = dt_declare_version_set(env, idx, th);
368         if (rc != 0)
369                 GOTO(out, rc);
370
371         rc = dt_trans_start_local(env, dev, th);
372         if (rc != 0)
373                 GOTO(out, rc);
374
375         dt_write_lock(env, idx, 0);
376
377         rc = dt_delete(env, idx, (const struct dt_key *)nk, th);
378
379         nodemap_inc_version(env, idx, th);
380
381         dt_write_unlock(env, idx);
382 out:
383         dt_trans_stop(env, dev, th);
384
385         return rc;
386 }
387
388 enum nm_add_update {
389         NM_ADD = 0,
390         NM_UPDATE = 1,
391 };
392
393 static int nodemap_idx_cluster_add_update(const struct lu_nodemap *nodemap,
394                                           struct dt_object *idx,
395                                           enum nm_add_update update,
396                                           enum nodemap_cluster_rec_subid subid)
397 {
398         struct nodemap_key nk;
399         union nodemap_rec nr;
400         struct lu_env env;
401         int rc = 0;
402
403         ENTRY;
404
405         if (idx == NULL) {
406                 if (nodemap_mgs_ncf == NULL) {
407                         CERROR("cannot add nodemap config to non-existing MGS.\n");
408                         return -EINVAL;
409                 }
410                 idx = nodemap_mgs_ncf->ncf_obj;
411         }
412
413         rc = lu_env_init(&env, LCT_LOCAL);
414         if (rc)
415                 RETURN(rc);
416
417         nodemap_cluster_key_init(&nk, nodemap->nm_id, subid);
418         switch (subid) {
419         case NODEMAP_CLUSTER_REC:
420                 nodemap_cluster_rec_init(&nr, nodemap);
421                 break;
422         case NODEMAP_CLUSTER_ROLES:
423                 nodemap_cluster_roles_rec_init(&nr, nodemap);
424                 break;
425         default:
426                 CWARN("%s: unknown subtype %u\n", nodemap->nm_name, subid);
427                 GOTO(fini, rc = -EINVAL);
428         }
429
430         if (update == NM_UPDATE)
431                 rc = nodemap_idx_update(&env, idx, &nk, &nr);
432         else
433                 rc = nodemap_idx_insert(&env, idx, &nk, &nr);
434
435 fini:
436         lu_env_fini(&env);
437         RETURN(rc);
438 }
439
440 int nodemap_idx_nodemap_add(const struct lu_nodemap *nodemap)
441 {
442         return nodemap_idx_cluster_add_update(nodemap, NULL,
443                                               NM_ADD, NODEMAP_CLUSTER_REC);
444 }
445
446 int nodemap_idx_nodemap_update(const struct lu_nodemap *nodemap)
447 {
448         return nodemap_idx_cluster_add_update(nodemap, NULL,
449                                               NM_UPDATE, NODEMAP_CLUSTER_REC);
450 }
451
452 int nodemap_idx_nodemap_del(const struct lu_nodemap *nodemap)
453 {
454         struct rb_root           root;
455         struct lu_idmap         *idmap;
456         struct lu_idmap         *temp;
457         struct lu_nid_range     *range;
458         struct lu_nid_range     *range_temp;
459         struct nodemap_key       nk;
460         struct lu_env            env;
461         int                      rc = 0;
462         int                      rc2 = 0;
463
464         ENTRY;
465
466         if (nodemap_mgs_ncf == NULL) {
467                 CERROR("cannot add nodemap config to non-existing MGS.\n");
468                 return -EINVAL;
469         }
470
471         rc = lu_env_init(&env, LCT_LOCAL);
472         if (rc != 0)
473                 RETURN(rc);
474
475         nodemap_cluster_key_init(&nk, nodemap->nm_id, NODEMAP_CLUSTER_ROLES);
476         rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
477         if (rc2 < 0 && rc2 != -ENOENT)
478                 rc = rc2;
479
480         root = nodemap->nm_fs_to_client_uidmap;
481         nm_rbtree_postorder_for_each_entry_safe(idmap, temp, &root,
482                                                 id_fs_to_client) {
483                 nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_UID,
484                                        idmap->id_client);
485                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
486                                          &nk, NULL);
487                 if (rc2 < 0)
488                         rc = rc2;
489         }
490
491         root = nodemap->nm_client_to_fs_gidmap;
492         nm_rbtree_postorder_for_each_entry_safe(idmap, temp, &root,
493                                                 id_client_to_fs) {
494                 nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_GID,
495                                        idmap->id_client);
496                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
497                                          &nk, NULL);
498                 if (rc2 < 0)
499                         rc = rc2;
500         }
501
502         root = nodemap->nm_client_to_fs_projidmap;
503         nm_rbtree_postorder_for_each_entry_safe(idmap, temp, &root,
504                                                 id_client_to_fs) {
505                 nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_PROJID,
506                                        idmap->id_client);
507                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
508                                          &nk, NULL);
509                 if (rc2 < 0)
510                         rc = rc2;
511         }
512
513         list_for_each_entry_safe(range, range_temp, &nodemap->nm_ranges,
514                                  rn_list) {
515                 nodemap_range_key_init(&nk, nodemap->nm_id, range->rn_id);
516                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
517                                          &nk, NULL);
518                 if (rc2 < 0)
519                         rc = rc2;
520         }
521
522         nodemap_cluster_key_init(&nk, nodemap->nm_id, NODEMAP_CLUSTER_REC);
523         rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
524         if (rc2 < 0)
525                 rc = rc2;
526
527         lu_env_fini(&env);
528
529         RETURN(rc);
530 }
531
532 int nodemap_idx_cluster_roles_add(const struct lu_nodemap *nodemap)
533 {
534         return nodemap_idx_cluster_add_update(nodemap, NULL, NM_ADD,
535                                               NODEMAP_CLUSTER_ROLES);
536 }
537
538 int nodemap_idx_cluster_roles_update(const struct lu_nodemap *nodemap)
539 {
540         return nodemap_idx_cluster_add_update(nodemap, NULL, NM_UPDATE,
541                                               NODEMAP_CLUSTER_ROLES);
542 }
543
544 int nodemap_idx_cluster_roles_del(const struct lu_nodemap *nodemap)
545 {
546         struct nodemap_key nk;
547         struct lu_env env;
548         int rc = 0;
549
550         ENTRY;
551
552         if (nodemap_mgs_ncf == NULL) {
553                 CERROR("cannot add nodemap config to non-existing MGS.\n");
554                 return -EINVAL;
555         }
556
557         rc = lu_env_init(&env, LCT_LOCAL);
558         if (rc != 0)
559                 RETURN(rc);
560
561         nodemap_cluster_key_init(&nk, nodemap->nm_id, NODEMAP_CLUSTER_ROLES);
562         rc = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
563
564         lu_env_fini(&env);
565         RETURN(rc);
566 }
567
568 int nodemap_idx_range_add(const struct lu_nid_range *range,
569                           const lnet_nid_t nid[2])
570 {
571         struct nodemap_key       nk;
572         union nodemap_rec        nr;
573         struct lu_env            env;
574         int                      rc = 0;
575         ENTRY;
576
577         if (nodemap_mgs_ncf == NULL) {
578                 CERROR("cannot add nodemap config to non-existing MGS.\n");
579                 return -EINVAL;
580         }
581
582         rc = lu_env_init(&env, LCT_LOCAL);
583         if (rc != 0)
584                 RETURN(rc);
585
586         nodemap_range_key_init(&nk, range->rn_nodemap->nm_id, range->rn_id);
587         nodemap_range_rec_init(&nr, nid);
588
589         rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj, &nk, &nr);
590         lu_env_fini(&env);
591
592         RETURN(rc);
593 }
594
595 int nodemap_idx_range_del(const struct lu_nid_range *range)
596 {
597         struct nodemap_key       nk;
598         struct lu_env            env;
599         int                      rc = 0;
600         ENTRY;
601
602         if (nodemap_mgs_ncf == NULL) {
603                 CERROR("cannot add nodemap config to non-existing MGS.\n");
604                 return -EINVAL;
605         }
606
607         rc = lu_env_init(&env, LCT_LOCAL);
608         if (rc != 0)
609                 RETURN(rc);
610
611         nodemap_range_key_init(&nk, range->rn_nodemap->nm_id, range->rn_id);
612
613         rc = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
614         lu_env_fini(&env);
615
616         RETURN(rc);
617 }
618
619 int nodemap_idx_idmap_add(const struct lu_nodemap *nodemap,
620                           enum nodemap_id_type id_type,
621                           const u32 map[2])
622 {
623         struct nodemap_key       nk;
624         union nodemap_rec        nr;
625         struct lu_env            env;
626         int                      rc = 0;
627         ENTRY;
628
629         if (nodemap_mgs_ncf == NULL) {
630                 CERROR("cannot add nodemap config to non-existing MGS.\n");
631                 return -EINVAL;
632         }
633
634         rc = lu_env_init(&env, LCT_LOCAL);
635         if (rc != 0)
636                 RETURN(rc);
637
638         nodemap_idmap_key_init(&nk, nodemap->nm_id, id_type, map[0]);
639         nodemap_idmap_rec_init(&nr, map[1]);
640
641         rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj, &nk, &nr);
642         lu_env_fini(&env);
643
644         RETURN(rc);
645 }
646
647 int nodemap_idx_idmap_del(const struct lu_nodemap *nodemap,
648                           enum nodemap_id_type id_type,
649                           const u32 map[2])
650 {
651         struct nodemap_key       nk;
652         struct lu_env            env;
653         int                      rc = 0;
654         ENTRY;
655
656         if (nodemap_mgs_ncf == NULL) {
657                 CERROR("cannot add nodemap config to non-existing MGS.\n");
658                 return -EINVAL;
659         }
660
661         rc = lu_env_init(&env, LCT_LOCAL);
662         if (rc != 0)
663                 RETURN(rc);
664
665         nodemap_idmap_key_init(&nk, nodemap->nm_id, id_type, map[0]);
666
667         rc = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
668         lu_env_fini(&env);
669
670         RETURN(rc);
671 }
672
673 static int nodemap_idx_global_add_update(bool value, enum nm_add_update update)
674 {
675         struct nodemap_key       nk;
676         union nodemap_rec        nr;
677         struct lu_env            env;
678         int                      rc = 0;
679         ENTRY;
680
681         if (nodemap_mgs_ncf == NULL) {
682                 CERROR("cannot add nodemap config to non-existing MGS.\n");
683                 return -EINVAL;
684         }
685
686         rc = lu_env_init(&env, LCT_LOCAL);
687         if (rc != 0)
688                 RETURN(rc);
689
690         nodemap_global_key_init(&nk);
691         nodemap_global_rec_init(&nr, value);
692
693         if (update == NM_UPDATE)
694                 rc = nodemap_idx_update(&env, nodemap_mgs_ncf->ncf_obj,
695                                         &nk, &nr);
696         else
697                 rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj,
698                                         &nk, &nr);
699
700         lu_env_fini(&env);
701
702         RETURN(rc);
703 }
704
705 int nodemap_idx_nodemap_activate(bool value)
706 {
707         return nodemap_idx_global_add_update(value, NM_UPDATE);
708 }
709
710 static enum nodemap_idx_type nodemap_get_key_type(const struct nodemap_key *key)
711 {
712         u32                      nodemap_id;
713
714         nodemap_id = le32_to_cpu(key->nk_nodemap_id);
715         return nm_idx_get_type(nodemap_id);
716 }
717
718 static int nodemap_get_key_subtype(const struct nodemap_key *key)
719 {
720         enum nodemap_idx_type type = nodemap_get_key_type(key);
721
722         return type == NODEMAP_CLUSTER_IDX ? key->nk_cluster_subid : -1;
723 }
724
725 static int nodemap_cluster_rec_helper(struct nodemap_config *config,
726                                       u32 nodemap_id,
727                                       const union nodemap_rec *rec,
728                                       struct lu_nodemap **recent_nodemap)
729 {
730         struct lu_nodemap *nodemap, *old_nm;
731         enum nm_flag_bits flags;
732         enum nm_flag2_bits flags2;
733
734         nodemap = cfs_hash_lookup(config->nmc_nodemap_hash, rec->ncr.ncr_name);
735         if (nodemap == NULL) {
736                 if (nodemap_id == LUSTRE_NODEMAP_DEFAULT_ID)
737                         nodemap = nodemap_create(rec->ncr.ncr_name, config, 1);
738                 else
739                         nodemap = nodemap_create(rec->ncr.ncr_name, config, 0);
740                 if (IS_ERR(nodemap))
741                         return PTR_ERR(nodemap);
742
743                 /* we need to override the local ID with the saved ID */
744                 nodemap->nm_id = nodemap_id;
745                 if (nodemap_id > config->nmc_nodemap_highest_id)
746                         config->nmc_nodemap_highest_id = nodemap_id;
747
748         } else if (nodemap->nm_id != nodemap_id) {
749                 nodemap_putref(nodemap);
750                 return -EINVAL;
751         }
752
753         nodemap->nm_squash_uid = le32_to_cpu(rec->ncr.ncr_squash_uid);
754         nodemap->nm_squash_gid = le32_to_cpu(rec->ncr.ncr_squash_gid);
755         nodemap->nm_squash_projid = le32_to_cpu(rec->ncr.ncr_squash_projid);
756
757         flags = rec->ncr.ncr_flags;
758         nodemap->nmf_allow_root_access = flags & NM_FL_ALLOW_ROOT_ACCESS;
759         nodemap->nmf_trust_client_ids = flags & NM_FL_TRUST_CLIENT_IDS;
760         nodemap->nmf_deny_unknown = flags & NM_FL_DENY_UNKNOWN;
761         nodemap->nmf_map_mode =
762                 (flags & NM_FL_MAP_UID ? NODEMAP_MAP_UID : 0) |
763                 (flags & NM_FL_MAP_GID ? NODEMAP_MAP_GID : 0) |
764                 (flags & NM_FL_MAP_PROJID ? NODEMAP_MAP_PROJID : 0);
765         if (nodemap->nmf_map_mode == NODEMAP_MAP_BOTH_LEGACY)
766                 nodemap->nmf_map_mode = NODEMAP_MAP_BOTH;
767         nodemap->nmf_enable_audit = flags & NM_FL_ENABLE_AUDIT;
768         nodemap->nmf_forbid_encryption = flags & NM_FL_FORBID_ENCRYPT;
769         flags2 = rec->ncr.ncr_flags2;
770         nodemap->nmf_readonly_mount = flags2 & NM_FL2_READONLY_MOUNT;
771         /* by default, and in the absence of cluster_roles, grant all roles */
772         nodemap->nmf_rbac = NODEMAP_RBAC_ALL;
773
774         /* The fileset should be saved otherwise it will be empty
775          * every time in case of "NODEMAP_CLUSTER_IDX".
776          */
777         mutex_lock(&active_config_lock);
778         old_nm = nodemap_lookup(rec->ncr.ncr_name);
779         if (!IS_ERR(old_nm) && old_nm->nm_fileset[0] != '\0')
780                 strlcpy(nodemap->nm_fileset, old_nm->nm_fileset,
781                         sizeof(nodemap->nm_fileset));
782         mutex_unlock(&active_config_lock);
783         if (!IS_ERR(old_nm))
784                 nodemap_putref(old_nm);
785
786         if (*recent_nodemap == NULL) {
787                 *recent_nodemap = nodemap;
788                 INIT_LIST_HEAD(&nodemap->nm_list);
789         } else {
790                 list_add(&nodemap->nm_list, &(*recent_nodemap)->nm_list);
791         }
792         nodemap_putref(nodemap);
793
794         return 0;
795 }
796
797 static int nodemap_cluster_roles_helper(struct lu_nodemap *nodemap,
798                                         const union nodemap_rec *rec)
799 {
800         nodemap->nmf_rbac = le64_to_cpu(rec->ncrr.ncrr_roles);
801
802         return 0;
803 }
804
805 /**
806  * Process a key/rec pair and modify the new configuration.
807  *
808  * \param       config          configuration to update with this key/rec data
809  * \param       key             key of the record that was loaded
810  * \param       rec             record that was loaded
811  * \param       recent_nodemap  last referenced nodemap
812  * \retval      type of record processed, see enum #nodemap_idx_type
813  * \retval      -ENOENT         range or map loaded before nodemap record
814  * \retval      -EINVAL         duplicate nodemap cluster records found with
815  *                              different IDs, or nodemap has invalid name
816  * \retval      -ENOMEM
817  */
818 static int nodemap_process_keyrec(struct nodemap_config *config,
819                                   const struct nodemap_key *key,
820                                   const union nodemap_rec *rec,
821                                   struct lu_nodemap **recent_nodemap)
822 {
823         struct lu_nodemap *nodemap = NULL;
824         enum nodemap_idx_type type;
825         enum nodemap_id_type id_type;
826         int subtype;
827         u32 nodemap_id;
828         lnet_nid_t nid[2];
829         u32 map[2];
830         int rc;
831
832         ENTRY;
833
834         BUILD_BUG_ON(sizeof(union nodemap_rec) != 32);
835
836         nodemap_id = le32_to_cpu(key->nk_nodemap_id);
837         type = nodemap_get_key_type(key);
838         subtype = nodemap_get_key_subtype(key);
839         nodemap_id = nm_idx_set_type(nodemap_id, 0);
840
841         CDEBUG(D_INFO, "found config entry, nm_id %d type %d subtype %d\n",
842                nodemap_id, type, subtype);
843
844         /* find the correct nodemap in the load list */
845         if (type == NODEMAP_RANGE_IDX || type == NODEMAP_UIDMAP_IDX ||
846             type == NODEMAP_GIDMAP_IDX || type == NODEMAP_PROJIDMAP_IDX ||
847             (type == NODEMAP_CLUSTER_IDX && subtype != NODEMAP_CLUSTER_REC)) {
848                 struct lu_nodemap *tmp = NULL;
849
850                 nodemap = *recent_nodemap;
851
852                 if (nodemap == NULL)
853                         GOTO(out, rc = -ENOENT);
854
855                 if (nodemap->nm_id != nodemap_id) {
856                         list_for_each_entry(tmp, &nodemap->nm_list, nm_list)
857                                 if (tmp->nm_id == nodemap_id) {
858                                         nodemap = tmp;
859                                         break;
860                                 }
861
862                         if (nodemap->nm_id != nodemap_id)
863                                 GOTO(out, rc = -ENOENT);
864                 }
865
866                 /* update most recently used nodemap if necessay */
867                 if (nodemap != *recent_nodemap)
868                         *recent_nodemap = nodemap;
869         }
870
871         switch (type) {
872         case NODEMAP_EMPTY_IDX:
873                 if (nodemap_id != 0)
874                         CWARN("Found nodemap config record without type field, "
875                               " nodemap_id=%d. nodemap config file corrupt?\n",
876                               nodemap_id);
877                 break;
878         case NODEMAP_CLUSTER_IDX:
879                 switch (nodemap_get_key_subtype(key)) {
880                 case NODEMAP_CLUSTER_REC:
881                         rc = nodemap_cluster_rec_helper(config, nodemap_id, rec,
882                                                         recent_nodemap);
883                         if (rc != 0)
884                                 GOTO(out, rc);
885                         break;
886                 case NODEMAP_CLUSTER_ROLES:
887                         rc = nodemap_cluster_roles_helper(nodemap, rec);
888                         if (rc != 0)
889                                 GOTO(out, rc);
890                         break;
891                 default:
892                         CWARN("%s: ignoring keyrec of type %d with subtype %u\n",
893                               nodemap->nm_name, NODEMAP_CLUSTER_IDX,
894                               nodemap_get_key_subtype(key));
895                         break;
896                 }
897                 break;
898         case NODEMAP_RANGE_IDX:
899                 nid[0] = le64_to_cpu(rec->nrr.nrr_start_nid);
900                 nid[1] = le64_to_cpu(rec->nrr.nrr_end_nid);
901
902                 rc = nodemap_add_range_helper(config, nodemap, nid,
903                                         le32_to_cpu(key->nk_range_id));
904                 if (rc != 0)
905                         GOTO(out, rc);
906                 break;
907         case NODEMAP_UIDMAP_IDX:
908         case NODEMAP_GIDMAP_IDX:
909         case NODEMAP_PROJIDMAP_IDX:
910                 map[0] = le32_to_cpu(key->nk_id_client);
911                 map[1] = le32_to_cpu(rec->nir.nir_id_fs);
912
913                 if (type == NODEMAP_UIDMAP_IDX)
914                         id_type = NODEMAP_UID;
915                 else if (type == NODEMAP_GIDMAP_IDX)
916                         id_type = NODEMAP_GID;
917                 else if (type == NODEMAP_PROJIDMAP_IDX)
918                         id_type = NODEMAP_PROJID;
919                 else
920                         GOTO(out, rc = -EINVAL);
921
922                 rc = nodemap_add_idmap_helper(nodemap, id_type, map);
923                 if (rc != 0)
924                         GOTO(out, rc);
925                 break;
926         case NODEMAP_GLOBAL_IDX:
927                 switch (key->nk_unused) {
928                 case 0:
929                         config->nmc_nodemap_is_active = rec->ngr.ngr_is_active;
930                         break;
931                 default:
932                         CWARN("%s: ignoring keyrec of type %d with subtype %u\n",
933                               recent_nodemap ?
934                                (*recent_nodemap)->nm_name : "nodemap",
935                               NODEMAP_GLOBAL_IDX, key->nk_unused);
936                         break;
937                 }
938                 break;
939         default:
940                 CWARN("%s: ignoring key %u:%u for unknown type %u\n",
941                       recent_nodemap ? (*recent_nodemap)->nm_name : "nodemap",
942                       key->nk_nodemap_id & 0x0FFFFFFF, key->nk_unused, type);
943                 break;
944         }
945
946         rc = type;
947
948         EXIT;
949
950 out:
951         return rc;
952 }
953
954 enum nm_config_passes {
955         NM_READ_CLUSTERS = 0,
956         NM_READ_ATTRIBUTES = 1,
957 };
958
959 static int nodemap_load_entries(const struct lu_env *env,
960                                 struct dt_object *nodemap_idx)
961 {
962         const struct dt_it_ops *iops;
963         struct dt_it *it;
964         struct lu_nodemap *recent_nodemap = NULL;
965         struct nodemap_config *new_config = NULL;
966         u64 hash = 0;
967         bool activate_nodemap = false;
968         bool loaded_global_idx = false;
969         enum nm_config_passes cur_pass = NM_READ_CLUSTERS;
970         int rc = 0;
971
972         ENTRY;
973
974         iops = &nodemap_idx->do_index_ops->dio_it;
975
976         dt_read_lock(env, nodemap_idx, 0);
977         it = iops->init(env, nodemap_idx, 0);
978         if (IS_ERR(it))
979                 GOTO(out, rc = PTR_ERR(it));
980
981         rc = iops->load(env, it, hash);
982         if (rc < 0)
983                 GOTO(out_iops_fini, rc);
984
985         /* rc == 0 means we need to advance to record */
986         if (rc == 0) {
987                 rc = iops->next(env, it);
988
989                 if (rc < 0)
990                         GOTO(out_iops_put, rc);
991                 /* rc > 0 is eof, will be checked in while below */
992         } else {
993                 /* rc == 1, we found initial record and can process below */
994                 rc = 0;
995         }
996
997         new_config = nodemap_config_alloc();
998         if (IS_ERR(new_config)) {
999                 rc = PTR_ERR(new_config);
1000                 new_config = NULL;
1001                 GOTO(out_iops_put, rc);
1002         }
1003
1004         /* rc > 0 is eof, check initial iops->next here as well */
1005         while (rc == 0) {
1006                 struct nodemap_key *key;
1007                 union nodemap_rec rec;
1008                 enum nodemap_idx_type key_type;
1009                 int sub_type;
1010
1011                 key = (struct nodemap_key *)iops->key(env, it);
1012                 key_type = nodemap_get_key_type((struct nodemap_key *)key);
1013                 sub_type = nodemap_get_key_subtype((struct nodemap_key *)key);
1014                 if ((cur_pass == NM_READ_CLUSTERS &&
1015                      key_type == NODEMAP_CLUSTER_IDX &&
1016                      sub_type == NODEMAP_CLUSTER_REC) ||
1017                     (cur_pass == NM_READ_ATTRIBUTES &&
1018                      (key_type != NODEMAP_CLUSTER_IDX ||
1019                       sub_type != NODEMAP_CLUSTER_REC) &&
1020                      key_type != NODEMAP_EMPTY_IDX)) {
1021                         rc = iops->rec(env, it, (struct dt_rec *)&rec, 0);
1022                         if (rc != -ESTALE) {
1023                                 if (rc != 0)
1024                                         GOTO(out_nodemap_config, rc);
1025                                 rc = nodemap_process_keyrec(new_config, key, &rec,
1026                                                             &recent_nodemap);
1027                                 if (rc < 0)
1028                                         GOTO(out_nodemap_config, rc);
1029                                 if (rc == NODEMAP_GLOBAL_IDX)
1030                                         loaded_global_idx = true;
1031                         }
1032                 }
1033
1034                 do
1035                         rc = iops->next(env, it);
1036                 while (rc == -ESTALE);
1037
1038                 /* move to second pass */
1039                 if (rc > 0 && cur_pass == NM_READ_CLUSTERS) {
1040                         cur_pass = NM_READ_ATTRIBUTES;
1041                         rc = iops->load(env, it, 0);
1042                         if (rc == 0)
1043                                 rc = iops->next(env, it);
1044                         else if (rc > 0)
1045                                 rc = 0;
1046                         else
1047                                 GOTO(out, rc);
1048                 }
1049         }
1050
1051         if (rc > 0)
1052                 rc = 0;
1053
1054 out_nodemap_config:
1055         if (rc != 0)
1056                 nodemap_config_dealloc(new_config);
1057         else
1058                 /* creating new default needs to be done outside dt read lock */
1059                 activate_nodemap = true;
1060 out_iops_put:
1061         iops->put(env, it);
1062 out_iops_fini:
1063         iops->fini(env, it);
1064 out:
1065         dt_read_unlock(env, nodemap_idx);
1066
1067         if (rc != 0)
1068                 CWARN("%s: failed to load nodemap configuration: rc = %d\n",
1069                       nodemap_idx->do_lu.lo_dev->ld_obd->obd_name, rc);
1070
1071         if (!activate_nodemap)
1072                 RETURN(rc);
1073
1074         if (new_config->nmc_default_nodemap == NULL) {
1075                 /* new MGS won't have a default nm on disk, so create it here */
1076                 struct lu_nodemap *nodemap =
1077                         nodemap_create(DEFAULT_NODEMAP, new_config, 1);
1078                 if (IS_ERR(nodemap)) {
1079                         rc = PTR_ERR(nodemap);
1080                 } else {
1081                         rc = nodemap_idx_cluster_add_update(
1082                                         new_config->nmc_default_nodemap,
1083                                         nodemap_idx,
1084                                         NM_ADD, NODEMAP_CLUSTER_REC);
1085                         nodemap_putref(new_config->nmc_default_nodemap);
1086                 }
1087         }
1088
1089         /* new nodemap config won't have an active/inactive record */
1090         if (rc == 0 && loaded_global_idx == false) {
1091                 struct nodemap_key       nk;
1092                 union nodemap_rec        nr;
1093
1094                 nodemap_global_key_init(&nk);
1095                 nodemap_global_rec_init(&nr, false);
1096                 rc = nodemap_idx_insert(env, nodemap_idx, &nk, &nr);
1097         }
1098
1099         if (rc == 0)
1100                 nodemap_config_set_active(new_config);
1101         else
1102                 nodemap_config_dealloc(new_config);
1103
1104         RETURN(rc);
1105 }
1106
1107 /**
1108  * Step through active config and write to disk.
1109  */
1110 struct dt_object *nodemap_save_config_cache(const struct lu_env *env,
1111                                             struct dt_device *dev,
1112                                             struct local_oid_storage *los)
1113 {
1114         struct dt_object *o;
1115         struct lu_nodemap *nodemap;
1116         struct lu_nodemap *nm_tmp;
1117         struct lu_nid_range *range;
1118         struct lu_nid_range *range_temp;
1119         struct lu_idmap *idmap;
1120         struct lu_idmap *id_tmp;
1121         struct rb_root root;
1122         struct nodemap_key nk;
1123         union nodemap_rec nr;
1124         LIST_HEAD(nodemap_list_head);
1125         int rc = 0, rc2;
1126
1127         ENTRY;
1128
1129         /* create a new index file to fill with active config */
1130         o = nodemap_cache_find_create(env, dev, los, NCFC_CREATE_NEW);
1131         if (IS_ERR(o))
1132                 RETURN(o);
1133
1134         mutex_lock(&active_config_lock);
1135
1136         /* convert hash to list so we don't spin */
1137         cfs_hash_for_each_safe(active_config->nmc_nodemap_hash,
1138                                nm_hash_list_cb, &nodemap_list_head);
1139
1140         list_for_each_entry_safe(nodemap, nm_tmp, &nodemap_list_head, nm_list) {
1141                 nodemap_cluster_key_init(&nk, nodemap->nm_id,
1142                                          NODEMAP_CLUSTER_REC);
1143                 nodemap_cluster_rec_init(&nr, nodemap);
1144
1145                 rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1146                 if (rc2 < 0) {
1147                         rc = rc2;
1148                         continue;
1149                 }
1150
1151                 /* only insert NODEMAP_CLUSTER_ROLES idx in saved config cache
1152                  * if nmf_rbac is not default value NODEMAP_RBAC_ALL
1153                  */
1154                 if (nodemap->nmf_rbac != NODEMAP_RBAC_ALL) {
1155                         nodemap_cluster_key_init(&nk, nodemap->nm_id,
1156                                                  NODEMAP_CLUSTER_ROLES);
1157                         nodemap_cluster_roles_rec_init(&nr, nodemap);
1158                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1159                         if (rc2 < 0)
1160                                 rc = rc2;
1161                 }
1162
1163                 down_read(&active_config->nmc_range_tree_lock);
1164                 list_for_each_entry_safe(range, range_temp, &nodemap->nm_ranges,
1165                                          rn_list) {
1166                         lnet_nid_t nid[2] = {
1167                                 range->rn_start,
1168                                 range->rn_end
1169                         };
1170                         nodemap_range_key_init(&nk, nodemap->nm_id,
1171                                                range->rn_id);
1172                         nodemap_range_rec_init(&nr, nid);
1173                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1174                         if (rc2 < 0)
1175                                 rc = rc2;
1176                 }
1177                 up_read(&active_config->nmc_range_tree_lock);
1178
1179                 /* we don't need to take nm_idmap_lock because active config
1180                  * lock prevents changes from happening to nodemaps
1181                  */
1182                 root = nodemap->nm_client_to_fs_uidmap;
1183                 nm_rbtree_postorder_for_each_entry_safe(idmap, id_tmp, &root,
1184                                                         id_client_to_fs) {
1185                         nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_UID,
1186                                                idmap->id_client);
1187                         nodemap_idmap_rec_init(&nr, idmap->id_fs);
1188                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1189                         if (rc2 < 0)
1190                                 rc = rc2;
1191                 }
1192
1193                 root = nodemap->nm_client_to_fs_gidmap;
1194                 nm_rbtree_postorder_for_each_entry_safe(idmap, id_tmp, &root,
1195                                                         id_client_to_fs) {
1196                         nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_GID,
1197                                                idmap->id_client);
1198                         nodemap_idmap_rec_init(&nr, idmap->id_fs);
1199                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1200                         if (rc2 < 0)
1201                                 rc = rc2;
1202                 }
1203
1204                 root = nodemap->nm_client_to_fs_projidmap;
1205                 nm_rbtree_postorder_for_each_entry_safe(idmap, id_tmp, &root,
1206                                                         id_client_to_fs) {
1207                         nodemap_idmap_key_init(&nk, nodemap->nm_id,
1208                                                NODEMAP_PROJID,
1209                                                idmap->id_client);
1210                         nodemap_idmap_rec_init(&nr, idmap->id_fs);
1211                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1212                         if (rc2 < 0)
1213                                 rc = rc2;
1214                 }
1215         }
1216         nodemap_global_key_init(&nk);
1217         nodemap_global_rec_init(&nr, active_config->nmc_nodemap_is_active);
1218         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1219         if (rc2 < 0)
1220                 rc = rc2;
1221
1222         mutex_unlock(&active_config_lock);
1223
1224         if (rc < 0) {
1225                 dt_object_put(env, o);
1226                 o = ERR_PTR(rc);
1227         }
1228
1229         RETURN(o);
1230 }
1231
1232 static void nodemap_save_all_caches(void)
1233 {
1234         struct nm_config_file   *ncf;
1235         struct lu_env            env;
1236         int                      rc = 0;
1237
1238         /* recreating nodemap cache requires fld_thread_key be in env */
1239         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD | LCT_MG_THREAD);
1240         if (rc != 0) {
1241                 CWARN("cannot init env for nodemap config: rc = %d\n", rc);
1242                 return;
1243         }
1244
1245         mutex_lock(&ncf_list_lock);
1246         list_for_each_entry(ncf, &ncf_list_head, ncf_list) {
1247                 struct dt_device *dev = lu2dt_dev(ncf->ncf_obj->do_lu.lo_dev);
1248                 struct obd_device *obd = ncf->ncf_obj->do_lu.lo_dev->ld_obd;
1249                 struct dt_object *o;
1250
1251                 /* put current config file so save conf can rewrite it */
1252                 dt_object_put_nocache(&env, ncf->ncf_obj);
1253                 ncf->ncf_obj = NULL;
1254
1255                 o = nodemap_save_config_cache(&env, dev, ncf->ncf_los);
1256                 if (IS_ERR(o))
1257                         CWARN("%s: error writing to nodemap config: rc = %d\n",
1258                               obd->obd_name, rc);
1259                 else
1260                         ncf->ncf_obj = o;
1261         }
1262         mutex_unlock(&ncf_list_lock);
1263
1264         lu_env_fini(&env);
1265 }
1266
1267 /* tracks if config still needs to be loaded, either from disk or network */
1268 static bool nodemap_config_loaded;
1269 static DEFINE_MUTEX(nodemap_config_loaded_lock);
1270
1271 /**
1272  * Ensures that configs loaded over the wire are prioritized over those loaded
1273  * from disk.
1274  *
1275  * \param config        config to set as the active config
1276  */
1277 void nodemap_config_set_active_mgc(struct nodemap_config *config)
1278 {
1279         mutex_lock(&nodemap_config_loaded_lock);
1280         nodemap_config_set_active(config);
1281         nodemap_config_loaded = true;
1282         nodemap_save_all_caches();
1283         mutex_unlock(&nodemap_config_loaded_lock);
1284 }
1285 EXPORT_SYMBOL(nodemap_config_set_active_mgc);
1286
1287 /**
1288  * Register a dt_object representing the config index file. This should be
1289  * called by targets in order to load the nodemap configuration from disk. The
1290  * dt_object should be created with local_index_find_or_create and the index
1291  * features should be enabled with do_index_try.
1292  *
1293  * \param obj   dt_object returned by local_index_find_or_create
1294  *
1295  * \retval      on success: nm_config_file handle for later deregistration
1296  * \retval      -ENOMEM         memory allocation failure
1297  * \retval      -ENOENT         error loading nodemap config
1298  * \retval      -EINVAL         error loading nodemap config
1299  * \retval      -EEXIST         nodemap config already registered for MGS
1300  */
1301 struct nm_config_file *nm_config_file_register_mgs(const struct lu_env *env,
1302                                                    struct dt_object *obj,
1303                                                    struct local_oid_storage *los)
1304 {
1305         struct nm_config_file *ncf;
1306         int rc = 0;
1307         ENTRY;
1308
1309         if (nodemap_mgs_ncf != NULL)
1310                 GOTO(out, ncf = ERR_PTR(-EEXIST));
1311
1312         OBD_ALLOC_PTR(ncf);
1313         if (ncf == NULL)
1314                 GOTO(out, ncf = ERR_PTR(-ENOMEM));
1315
1316         /* if loading from cache, prevent activation of MGS config until cache
1317          * loading is done, so disk config is overwritten by MGS config.
1318          */
1319         mutex_lock(&nodemap_config_loaded_lock);
1320         rc = nodemap_load_entries(env, obj);
1321         if (!rc)
1322                 nodemap_config_loaded = true;
1323         mutex_unlock(&nodemap_config_loaded_lock);
1324
1325         if (rc) {
1326                 OBD_FREE_PTR(ncf);
1327                 GOTO(out, ncf = ERR_PTR(rc));
1328         }
1329
1330         lu_object_get(&obj->do_lu);
1331
1332         ncf->ncf_obj = obj;
1333         ncf->ncf_los = los;
1334
1335         nodemap_mgs_ncf = ncf;
1336
1337 out:
1338         return ncf;
1339 }
1340 EXPORT_SYMBOL(nm_config_file_register_mgs);
1341
1342 struct nm_config_file *nm_config_file_register_tgt(const struct lu_env *env,
1343                                                    struct dt_device *dev,
1344                                                    struct local_oid_storage *los)
1345 {
1346         struct nm_config_file *ncf;
1347         struct dt_object *config_obj = NULL;
1348         int rc = 0;
1349
1350         OBD_ALLOC_PTR(ncf);
1351         if (ncf == NULL)
1352                 RETURN(ERR_PTR(-ENOMEM));
1353
1354         /* don't load from cache if config already loaded */
1355         mutex_lock(&nodemap_config_loaded_lock);
1356         if (!nodemap_config_loaded) {
1357                 config_obj = nodemap_cache_find_create(env, dev, los, 0);
1358                 if (IS_ERR(config_obj))
1359                         rc = PTR_ERR(config_obj);
1360                 else
1361                         rc = nodemap_load_entries(env, config_obj);
1362
1363                 if (!rc)
1364                         nodemap_config_loaded = true;
1365         }
1366         mutex_unlock(&nodemap_config_loaded_lock);
1367         if (rc)
1368                 GOTO(out_ncf, rc);
1369
1370         /* sync on disk caches w/ loaded config in memory, ncf_obj may change */
1371         if (!config_obj) {
1372                 config_obj = nodemap_save_config_cache(env, dev, los);
1373                 if (IS_ERR(config_obj))
1374                         GOTO(out_ncf, rc = PTR_ERR(config_obj));
1375         }
1376
1377         ncf->ncf_obj = config_obj;
1378         ncf->ncf_los = los;
1379
1380         mutex_lock(&ncf_list_lock);
1381         list_add(&ncf->ncf_list, &ncf_list_head);
1382         mutex_unlock(&ncf_list_lock);
1383
1384 out_ncf:
1385         if (rc) {
1386                 OBD_FREE_PTR(ncf);
1387                 RETURN(ERR_PTR(rc));
1388         }
1389
1390         RETURN(ncf);
1391 }
1392 EXPORT_SYMBOL(nm_config_file_register_tgt);
1393
1394 /**
1395  * Deregister a nm_config_file. Should be called by targets during cleanup.
1396  *
1397  * \param ncf   config file to deregister
1398  */
1399 void nm_config_file_deregister_mgs(const struct lu_env *env,
1400                                    struct nm_config_file *ncf)
1401 {
1402         ENTRY;
1403         LASSERT(nodemap_mgs_ncf == ncf);
1404
1405         nodemap_mgs_ncf = NULL;
1406         if (ncf->ncf_obj)
1407                 dt_object_put(env, ncf->ncf_obj);
1408
1409         OBD_FREE_PTR(ncf);
1410
1411         EXIT;
1412 }
1413 EXPORT_SYMBOL(nm_config_file_deregister_mgs);
1414
1415 void nm_config_file_deregister_tgt(const struct lu_env *env,
1416                                    struct nm_config_file *ncf)
1417 {
1418         ENTRY;
1419
1420         if (ncf == NULL)
1421                 return;
1422
1423         mutex_lock(&ncf_list_lock);
1424         list_del(&ncf->ncf_list);
1425         mutex_unlock(&ncf_list_lock);
1426
1427         if (ncf->ncf_obj)
1428                 dt_object_put(env, ncf->ncf_obj);
1429
1430         OBD_FREE_PTR(ncf);
1431
1432         EXIT;
1433 }
1434 EXPORT_SYMBOL(nm_config_file_deregister_tgt);
1435
1436 int nodemap_process_idx_pages(struct nodemap_config *config, union lu_page *lip,
1437                               struct lu_nodemap **recent_nodemap)
1438 {
1439         struct nodemap_key *key;
1440         union nodemap_rec *rec;
1441         char *entry;
1442         int j;
1443         int k;
1444         int rc = 0;
1445         int size = dt_nodemap_features.dif_keysize_max +
1446                    dt_nodemap_features.dif_recsize_max;
1447         ENTRY;
1448
1449         for (j = 0; j < LU_PAGE_COUNT; j++) {
1450                 if (lip->lp_idx.lip_magic != LIP_MAGIC)
1451                         return -EINVAL;
1452
1453                 /* get and process keys and records from page */
1454                 for (k = 0; k < lip->lp_idx.lip_nr; k++) {
1455                         entry = lip->lp_idx.lip_entries + k * size;
1456                         key = (struct nodemap_key *)entry;
1457
1458                         entry += dt_nodemap_features.dif_keysize_max;
1459                         rec = (union nodemap_rec *)entry;
1460
1461                         rc = nodemap_process_keyrec(config, key, rec,
1462                                                     recent_nodemap);
1463                         if (rc < 0)
1464                                 return rc;
1465                 }
1466                 lip++;
1467         }
1468
1469         EXIT;
1470         return 0;
1471 }
1472 EXPORT_SYMBOL(nodemap_process_idx_pages);
1473
1474 static int nodemap_page_build(const struct lu_env *env, struct dt_object *obj,
1475                               union lu_page *lp, size_t bytes,
1476                               const struct dt_it_ops *iops,
1477                               struct dt_it *it, __u32 attr, void *arg)
1478 {
1479         struct idx_info *ii = (struct idx_info *)arg;
1480         struct lu_idxpage *lip = &lp->lp_idx;
1481         char *entry;
1482         size_t size = ii->ii_keysize + ii->ii_recsize;
1483         int rc;
1484         ENTRY;
1485
1486         if (bytes < LIP_HDR_SIZE)
1487                 return -EINVAL;
1488
1489         /* initialize the header of the new container */
1490         memset(lip, 0, LIP_HDR_SIZE);
1491         lip->lip_magic = LIP_MAGIC;
1492         bytes -= LIP_HDR_SIZE;
1493
1494         entry = lip->lip_entries;
1495         do {
1496                 char *tmp_entry = entry;
1497                 struct dt_key *key;
1498                 __u64 hash;
1499                 enum nodemap_idx_type key_type;
1500                 int sub_type;
1501
1502                 /* fetch 64-bit hash value */
1503                 hash = iops->store(env, it);
1504                 ii->ii_hash_end = hash;
1505
1506                 if (OBD_FAIL_CHECK(OBD_FAIL_OBD_IDX_READ_BREAK)) {
1507                         if (lip->lip_nr != 0)
1508                                 GOTO(out, rc = 0);
1509                 }
1510
1511                 if (bytes < size) {
1512                         if (lip->lip_nr == 0)
1513                                 GOTO(out, rc = -EINVAL);
1514                         GOTO(out, rc = 0);
1515                 }
1516
1517                 key = iops->key(env, it);
1518                 key_type = nodemap_get_key_type((struct nodemap_key *)key);
1519                 sub_type = nodemap_get_key_subtype((struct nodemap_key *)key);
1520
1521                 /* on the first pass, get only the cluster types. On second
1522                  * pass, get all the rest */
1523                 if ((ii->ii_attrs == NM_READ_CLUSTERS &&
1524                      key_type == NODEMAP_CLUSTER_IDX &&
1525                      sub_type == NODEMAP_CLUSTER_REC) ||
1526                     (ii->ii_attrs == NM_READ_ATTRIBUTES &&
1527                      (key_type != NODEMAP_CLUSTER_IDX ||
1528                       sub_type != NODEMAP_CLUSTER_REC) &&
1529                      key_type != NODEMAP_EMPTY_IDX)) {
1530                         memcpy(tmp_entry, key, ii->ii_keysize);
1531                         tmp_entry += ii->ii_keysize;
1532
1533                         /* and finally the record */
1534                         rc = iops->rec(env, it, (struct dt_rec *)tmp_entry,
1535                                        attr);
1536                         if (rc != -ESTALE) {
1537                                 if (rc != 0)
1538                                         GOTO(out, rc);
1539
1540                                 /* hash/key/record successfully copied! */
1541                                 lip->lip_nr++;
1542                                 if (unlikely(lip->lip_nr == 1 &&
1543                                     ii->ii_count == 0))
1544                                         ii->ii_hash_start = hash;
1545
1546                                 entry = tmp_entry + ii->ii_recsize;
1547                                 bytes -= size;
1548                         }
1549                 }
1550
1551                 /* move on to the next record */
1552                 do {
1553                         rc = iops->next(env, it);
1554                 } while (rc == -ESTALE);
1555
1556                 /* move to second pass */
1557                 if (rc > 0 && ii->ii_attrs == NM_READ_CLUSTERS) {
1558                         ii->ii_attrs = NM_READ_ATTRIBUTES;
1559                         rc = iops->load(env, it, 0);
1560                         if (rc == 0)
1561                                 rc = iops->next(env, it);
1562                         else if (rc > 0)
1563                                 rc = 0;
1564                         else
1565                                 GOTO(out, rc);
1566                 }
1567
1568         } while (rc == 0);
1569
1570         GOTO(out, rc);
1571 out:
1572         if (rc >= 0 && lip->lip_nr > 0)
1573                 /* one more container */
1574                 ii->ii_count++;
1575         if (rc > 0)
1576                 /* no more entries */
1577                 ii->ii_hash_end = II_END_OFF;
1578         return rc;
1579 }
1580
1581
1582 int nodemap_index_read(struct lu_env *env,
1583                        struct nm_config_file *ncf,
1584                        struct idx_info *ii,
1585                        const struct lu_rdpg *rdpg)
1586 {
1587         struct dt_object        *nodemap_idx = ncf->ncf_obj;
1588         __u64                    version;
1589         int                      rc = 0;
1590
1591         ii->ii_keysize = dt_nodemap_features.dif_keysize_max;
1592         ii->ii_recsize = dt_nodemap_features.dif_recsize_max;
1593
1594         dt_read_lock(env, nodemap_idx, 0);
1595         version = dt_version_get(env, nodemap_idx);
1596         if (rdpg->rp_hash != 0 && ii->ii_version != version) {
1597                 CDEBUG(D_INFO, "nodemap config changed inflight, old %llu, new %llu\n",
1598                        ii->ii_version,
1599                        version);
1600                 ii->ii_hash_end = 0;
1601         } else {
1602                 rc = dt_index_walk(env, nodemap_idx, rdpg, nodemap_page_build,
1603                                    ii);
1604                 CDEBUG(D_INFO, "walked index, hashend %llx\n", ii->ii_hash_end);
1605         }
1606
1607         if (rc >= 0)
1608                 ii->ii_version = version;
1609
1610         dt_read_unlock(env, nodemap_idx);
1611         return rc;
1612 }
1613 EXPORT_SYMBOL(nodemap_index_read);
1614
1615 /**
1616  * Returns the current nodemap configuration to MGC by walking the nodemap
1617  * config index and storing it in the response buffer.
1618  *
1619  * \param       req             incoming MGS_CONFIG_READ request
1620  * \retval      0               success
1621  * \retval      -EINVAL         malformed request
1622  * \retval      -ENOTCONN       client evicted/reconnected already
1623  * \retval      -ETIMEDOUT      client timeout or network error
1624  * \retval      -ENOMEM
1625  */
1626 int nodemap_get_config_req(struct obd_device *mgs_obd,
1627                            struct ptlrpc_request *req)
1628 {
1629         const struct ptlrpc_bulk_frag_ops *frag_ops = &ptlrpc_bulk_kiov_pin_ops;
1630         struct mgs_config_body *body;
1631         struct mgs_config_res *res;
1632         struct lu_rdpg rdpg;
1633         struct idx_info nodemap_ii;
1634         struct ptlrpc_bulk_desc *desc;
1635         struct tg_export_data *rqexp_ted = &req->rq_export->exp_target_data;
1636         int i;
1637         int page_count;
1638         int bytes = 0;
1639         int rc = 0;
1640
1641         body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
1642         if (!body)
1643                 RETURN(-EINVAL);
1644
1645         if (body->mcb_type != MGS_CFG_T_NODEMAP)
1646                 RETURN(-EINVAL);
1647
1648         rdpg.rp_count = (body->mcb_units << body->mcb_bits);
1649         rdpg.rp_npages = (rdpg.rp_count + PAGE_SIZE - 1) >>
1650                 PAGE_SHIFT;
1651         if (rdpg.rp_npages > PTLRPC_MAX_BRW_PAGES)
1652                 RETURN(-EINVAL);
1653
1654         CDEBUG(D_INFO, "reading nodemap log, name '%s', size = %u\n",
1655                body->mcb_name, rdpg.rp_count);
1656
1657         /* allocate pages to store the containers */
1658         OBD_ALLOC_PTR_ARRAY(rdpg.rp_pages, rdpg.rp_npages);
1659         if (rdpg.rp_pages == NULL)
1660                 RETURN(-ENOMEM);
1661         for (i = 0; i < rdpg.rp_npages; i++) {
1662                 rdpg.rp_pages[i] = alloc_page(GFP_NOFS);
1663                 if (rdpg.rp_pages[i] == NULL)
1664                         GOTO(out, rc = -ENOMEM);
1665         }
1666
1667         rdpg.rp_hash = body->mcb_offset;
1668         nodemap_ii.ii_magic = IDX_INFO_MAGIC;
1669         nodemap_ii.ii_flags = II_FL_NOHASH;
1670         nodemap_ii.ii_version = rqexp_ted->ted_nodemap_version;
1671         nodemap_ii.ii_attrs = body->mcb_nm_cur_pass;
1672
1673         bytes = nodemap_index_read(req->rq_svc_thread->t_env,
1674                                    obd2obt(mgs_obd)->obt_nodemap_config_file,
1675                                    &nodemap_ii, &rdpg);
1676         if (bytes < 0)
1677                 GOTO(out, rc = bytes);
1678
1679         rqexp_ted->ted_nodemap_version = nodemap_ii.ii_version;
1680
1681         res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
1682         if (res == NULL)
1683                 GOTO(out, rc = -EINVAL);
1684         res->mcr_offset = nodemap_ii.ii_hash_end;
1685         res->mcr_nm_cur_pass = nodemap_ii.ii_attrs;
1686
1687         page_count = (bytes + PAGE_SIZE - 1) >> PAGE_SHIFT;
1688         LASSERT(page_count <= rdpg.rp_count);
1689         desc = ptlrpc_prep_bulk_exp(req, page_count, 1,
1690                                     PTLRPC_BULK_PUT_SOURCE,
1691                                     MGS_BULK_PORTAL, frag_ops);
1692         if (desc == NULL)
1693                 GOTO(out, rc = -ENOMEM);
1694
1695         for (i = 0; i < page_count && bytes > 0; i++) {
1696                 frag_ops->add_kiov_frag(desc, rdpg.rp_pages[i], 0,
1697                                         min_t(int, bytes, PAGE_SIZE));
1698                 bytes -= PAGE_SIZE;
1699         }
1700
1701         rc = target_bulk_io(req->rq_export, desc);
1702         ptlrpc_free_bulk(desc);
1703
1704 out:
1705         if (rdpg.rp_pages != NULL) {
1706                 for (i = 0; i < rdpg.rp_npages; i++)
1707                         if (rdpg.rp_pages[i] != NULL)
1708                                 __free_page(rdpg.rp_pages[i]);
1709                 OBD_FREE_PTR_ARRAY(rdpg.rp_pages, rdpg.rp_npages);
1710         }
1711         return rc;
1712 }
1713 EXPORT_SYMBOL(nodemap_get_config_req);