Whamcloud - gitweb
LU-17431 nodemap: determine if nodemap is currently loading
[fs/lustre-release.git] / lustre / ptlrpc / nodemap_storage.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (C) 2015, Trustees of Indiana University
24  *
25  * Copyright (c) 2017, Intel Corporation.
26  *
27  * Author: Joshua Walgenbach <jjw@iu.edu>
28  * Author: Kit Westneat <cwestnea@iu.edu>
29  *
30  * Implements the storage functionality for the nodemap configuration. Functions
31  * in this file prepare, store, and load nodemap configuration data. Targets
32  * using nodemap services should register a configuration file object. Nodemap
33  * configuration changes that need to persist should call the appropriate
34  * storage function for the data being modified.
35  *
36  * There are several index types as defined in enum nodemap_idx_type:
37  *      NODEMAP_CLUSTER_IDX     stores the data found on the lu_nodemap struct,
38  *                              like root squash and config flags, as well as
39  *                              the name.
40  *      NODEMAP_RANGE_IDX       stores NID range information for a nodemap
41  *      NODEMAP_UIDMAP_IDX      stores a fs/client UID mapping pair
42  *      NODEMAP_GIDMAP_IDX      stores a fs/client GID mapping pair
43  *      NODEMAP_GLOBAL_IDX      stores whether or not nodemaps are active
44  */
45
46 #include <libcfs/libcfs.h>
47 #include <linux/err.h>
48 #include <linux/kernel.h>
49 #include <linux/list.h>
50 #include <linux/mutex.h>
51 #include <linux/string.h>
52 #include <linux/types.h>
53 #include <uapi/linux/lnet/lnet-types.h>
54 #include <uapi/linux/lustre/lustre_idl.h>
55 #include <uapi/linux/lustre/lustre_ioctl.h>
56 #include <uapi/linux/lustre/lustre_disk.h>
57 #include <dt_object.h>
58 #include <lu_object.h>
59 #include <lustre_net.h>
60 #include <lustre_nodemap.h>
61 #include <obd_class.h>
62 #include <obd_support.h>
63 #include "nodemap_internal.h"
64
65 /* list of registered nodemap index files, except MGS */
66 static LIST_HEAD(ncf_list_head);
67 static DEFINE_MUTEX(ncf_list_lock);
68
69 /* MGS index is different than others, others are listeners to MGS idx */
70 static struct nm_config_file *nodemap_mgs_ncf;
71
72 bool nodemap_mgs(void)
73 {
74         return (nodemap_mgs_ncf != NULL);
75 }
76
77 static void nodemap_cluster_key_init(struct nodemap_key *nk, unsigned int nm_id,
78                                      enum nodemap_cluster_rec_subid subid)
79 {
80         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id,
81                                                         NODEMAP_CLUSTER_IDX));
82         nk->nk_cluster_subid = subid;
83 }
84
85 static void nodemap_cluster_rec_init(union nodemap_rec *nr,
86                                      const struct lu_nodemap *nodemap)
87 {
88         BUILD_BUG_ON(sizeof(nr->ncr.ncr_name) != sizeof(nodemap->nm_name));
89
90         strncpy(nr->ncr.ncr_name, nodemap->nm_name, sizeof(nr->ncr.ncr_name));
91         nr->ncr.ncr_squash_uid = cpu_to_le32(nodemap->nm_squash_uid);
92         nr->ncr.ncr_squash_gid = cpu_to_le32(nodemap->nm_squash_gid);
93         nr->ncr.ncr_squash_projid = cpu_to_le32(nodemap->nm_squash_projid);
94         nr->ncr.ncr_flags =
95                 (nodemap->nmf_trust_client_ids ?
96                         NM_FL_TRUST_CLIENT_IDS : 0) |
97                 (nodemap->nmf_allow_root_access ?
98                         NM_FL_ALLOW_ROOT_ACCESS : 0) |
99                 (nodemap->nmf_deny_unknown ?
100                         NM_FL_DENY_UNKNOWN : 0) |
101                 (nodemap->nmf_map_mode & NODEMAP_MAP_UID ?
102                         NM_FL_MAP_UID : 0) |
103                 (nodemap->nmf_map_mode & NODEMAP_MAP_GID ?
104                         NM_FL_MAP_GID : 0) |
105                 (nodemap->nmf_map_mode & NODEMAP_MAP_PROJID ?
106                         NM_FL_MAP_PROJID : 0) |
107                 (nodemap->nmf_enable_audit ?
108                         NM_FL_ENABLE_AUDIT : 0) |
109                 (nodemap->nmf_forbid_encryption ?
110                         NM_FL_FORBID_ENCRYPT : 0);
111         nr->ncr.ncr_flags2 =
112                 (nodemap->nmf_readonly_mount ?
113                         NM_FL2_READONLY_MOUNT : 0);
114 }
115
116 static void nodemap_cluster_roles_rec_init(union nodemap_rec *nr,
117                                            const struct lu_nodemap *nodemap)
118 {
119         struct nodemap_cluster_roles_rec *ncrr = &nr->ncrr;
120
121         memset(ncrr, 0, sizeof(struct nodemap_cluster_roles_rec));
122         ncrr->ncrr_roles = cpu_to_le64(nodemap->nmf_rbac);
123 }
124
125 static void nodemap_idmap_key_init(struct nodemap_key *nk, unsigned int nm_id,
126                                    enum nodemap_id_type id_type,
127                                    u32 id_client)
128 {
129         enum nodemap_idx_type idx_type;
130
131         if (id_type == NODEMAP_UID)
132                 idx_type = NODEMAP_UIDMAP_IDX;
133         else if (id_type == NODEMAP_GID)
134                 idx_type = NODEMAP_GIDMAP_IDX;
135         else if (id_type == NODEMAP_PROJID)
136                 idx_type = NODEMAP_PROJIDMAP_IDX;
137         else
138                 idx_type = NODEMAP_EMPTY_IDX;
139
140         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id, idx_type));
141         nk->nk_id_client = cpu_to_le32(id_client);
142 }
143
144 static void nodemap_idmap_rec_init(union nodemap_rec *nr, u32 id_fs)
145 {
146         nr->nir.nir_id_fs = cpu_to_le32(id_fs);
147 }
148
149 static void nodemap_range_key_init(struct nodemap_key *nk,
150                                    enum nodemap_idx_type type,
151                                    unsigned int nm_id, unsigned int rn_id)
152 {
153         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(nm_id, type));
154         nk->nk_range_id = cpu_to_le32(rn_id);
155 }
156
157 static int nodemap_range_rec_init(union nodemap_rec *nr,
158                                   const struct lu_nid_range *range)
159 {
160         if (range->rn_netmask) {
161                 nr->nrr2.nrr_nid_prefix = range->rn_start;
162                 nr->nrr2.nrr_netmask = range->rn_netmask;
163
164                 if (NID_BYTES(&nr->nrr2.nrr_nid_prefix) >
165                     sizeof(struct lnet_nid))
166                         return -E2BIG;
167         } else {
168                 lnet_nid_t nid4[2];
169
170                 if (!nid_is_nid4(&range->rn_start) ||
171                     !nid_is_nid4(&range->rn_end))
172                         return -EINVAL;
173
174                 nid4[0] = lnet_nid_to_nid4(&range->rn_start);
175                 nid4[1] = lnet_nid_to_nid4(&range->rn_end);
176                 nr->nrr.nrr_start_nid = cpu_to_le64(nid4[0]);
177                 nr->nrr.nrr_end_nid = cpu_to_le64(nid4[1]);
178         }
179
180         return 0;
181 }
182
183 static void nodemap_global_key_init(struct nodemap_key *nk)
184 {
185         nk->nk_nodemap_id = cpu_to_le32(nm_idx_set_type(0, NODEMAP_GLOBAL_IDX));
186         nk->nk_unused = 0;
187 }
188
189 static void nodemap_global_rec_init(union nodemap_rec *nr, bool active)
190 {
191         nr->ngr.ngr_is_active = active;
192 }
193
194 /* should be called with dt_write lock */
195 static void nodemap_inc_version(const struct lu_env *env,
196                                 struct dt_object *nodemap_idx,
197                                 struct thandle *th)
198 {
199         u64 ver = dt_version_get(env, nodemap_idx);
200         dt_version_set(env, nodemap_idx, ver + 1, th);
201 }
202
203 enum ncfc_find_create {
204         NCFC_CREATE_NEW = 1,
205 };
206
207 static struct dt_object *nodemap_cache_find_create(const struct lu_env *env,
208                                                    struct dt_device *dev,
209                                                    struct local_oid_storage *los,
210                                                    enum ncfc_find_create create_new)
211 {
212         struct lu_fid tfid;
213         struct dt_object *root_obj;
214         struct dt_object *nm_obj;
215         int rc = 0;
216
217         rc = dt_root_get(env, dev, &tfid);
218         if (rc < 0)
219                 GOTO(out, nm_obj = ERR_PTR(rc));
220
221         root_obj = dt_locate(env, dev, &tfid);
222         if (unlikely(IS_ERR(root_obj)))
223                 GOTO(out, nm_obj = root_obj);
224
225         rc = dt_lookup_dir(env, root_obj, LUSTRE_NODEMAP_NAME, &tfid);
226         if (rc == -ENOENT) {
227                 if (dev->dd_rdonly)
228                         GOTO(out_root, nm_obj = ERR_PTR(-EROFS));
229         } else if (rc) {
230                 GOTO(out_root, nm_obj = ERR_PTR(rc));
231         } else if (dev->dd_rdonly && create_new == NCFC_CREATE_NEW) {
232                 GOTO(out_root, nm_obj = ERR_PTR(-EROFS));
233         }
234
235 again:
236         /* if loading index fails the first time, create new index */
237         if (create_new == NCFC_CREATE_NEW && rc != -ENOENT) {
238                 CDEBUG(D_INFO, "removing old index, creating new one\n");
239                 rc = local_object_unlink(env, dev, root_obj,
240                                          LUSTRE_NODEMAP_NAME);
241                 if (rc < 0) {
242                         /* XXX not sure the best way to get obd name. */
243                         CERROR("cannot destroy nodemap index: rc = %d\n",
244                                rc);
245                         GOTO(out_root, nm_obj = ERR_PTR(rc));
246                 }
247         }
248
249 retry:
250         nm_obj = local_index_find_or_create(env, los, root_obj,
251                                                 LUSTRE_NODEMAP_NAME,
252                                                 S_IFREG | S_IRUGO | S_IWUSR,
253                                                 &dt_nodemap_features);
254         if (IS_ERR(nm_obj)) {
255                 if (PTR_ERR(nm_obj) == -EEXIST && rc != -ENOENT &&
256                     los->los_last_oid < (tfid.f_oid - 1)) {
257                         if (dt2lu_dev(dev)->ld_obd)
258                                 dt2lu_dev(dev)->ld_obd->obd_need_scrub = 1;
259
260                         mutex_lock(&los->los_id_lock);
261                         los->los_last_oid = tfid.f_oid - 1;
262                         mutex_unlock(&los->los_id_lock);
263
264                         goto retry;
265                 }
266
267                 GOTO(out_root, nm_obj);
268         }
269
270         if (nm_obj->do_index_ops == NULL) {
271                 rc = nm_obj->do_ops->do_index_try(env, nm_obj,
272                                                       &dt_nodemap_features);
273                 /* even if loading from tgt fails, connecting to MGS will
274                  * rewrite the config
275                  */
276                 if (rc < 0) {
277                         dt_object_put(env, nm_obj);
278
279                         if (create_new == NCFC_CREATE_NEW)
280                                 GOTO(out_root, nm_obj = ERR_PTR(rc));
281
282                         CERROR("cannot load nodemap index from disk, creating "
283                                "new index: rc = %d\n", rc);
284                         create_new = NCFC_CREATE_NEW;
285                         goto again;
286                 }
287         }
288
289 out_root:
290         dt_object_put(env, root_obj);
291 out:
292         return nm_obj;
293 }
294
295 static int nodemap_idx_insert(const struct lu_env *env,
296                               struct dt_object *idx,
297                               const struct nodemap_key *nk,
298                               const union nodemap_rec *nr)
299 {
300         struct thandle *th;
301         struct dt_device *dev = lu2dt_dev(idx->do_lu.lo_dev);
302         int rc;
303
304         BUILD_BUG_ON(sizeof(union nodemap_rec) != 32);
305
306         th = dt_trans_create(env, dev);
307
308         if (IS_ERR(th))
309                 GOTO(out, rc = PTR_ERR(th));
310
311         rc = dt_declare_insert(env, idx,
312                                (const struct dt_rec *)nr,
313                                (const struct dt_key *)nk, th);
314         if (rc != 0)
315                 GOTO(out, rc);
316
317         rc = dt_declare_version_set(env, idx, th);
318         if (rc != 0)
319                 GOTO(out, rc);
320
321         rc = dt_trans_start_local(env, dev, th);
322         if (rc != 0)
323                 GOTO(out, rc);
324
325         dt_write_lock(env, idx, 0);
326
327         rc = dt_insert(env, idx, (const struct dt_rec *)nr,
328                        (const struct dt_key *)nk, th);
329
330         nodemap_inc_version(env, idx, th);
331         dt_write_unlock(env, idx);
332 out:
333         dt_trans_stop(env, dev, th);
334
335         return rc;
336 }
337
338 static int nodemap_idx_update(const struct lu_env *env,
339                               struct dt_object *idx,
340                               const struct nodemap_key *nk,
341                               const union nodemap_rec *nr)
342 {
343         struct thandle          *th;
344         struct dt_device        *dev = lu2dt_dev(idx->do_lu.lo_dev);
345         int                      rc = 0;
346
347         th = dt_trans_create(env, dev);
348
349         if (IS_ERR(th))
350                 GOTO(out, rc = PTR_ERR(th));
351
352         rc = dt_declare_delete(env, idx, (const struct dt_key *)nk, th);
353         if (rc != 0)
354                 GOTO(out, rc);
355
356         rc = dt_declare_insert(env, idx, (const struct dt_rec *)nr,
357                                (const struct dt_key *)nk, th);
358         if (rc != 0)
359                 GOTO(out, rc);
360
361         rc = dt_declare_version_set(env, idx, th);
362         if (rc != 0)
363                 GOTO(out, rc);
364
365         rc = dt_trans_start_local(env, dev, th);
366         if (rc != 0)
367                 GOTO(out, rc);
368
369         dt_write_lock(env, idx, 0);
370
371         rc = dt_delete(env, idx, (const struct dt_key *)nk, th);
372         if (rc != 0)
373                 GOTO(out_lock, rc);
374
375         rc = dt_insert(env, idx, (const struct dt_rec *)nr,
376                        (const struct dt_key *)nk, th);
377         if (rc != 0)
378                 GOTO(out_lock, rc);
379
380         nodemap_inc_version(env, idx, th);
381 out_lock:
382         dt_write_unlock(env, idx);
383 out:
384         dt_trans_stop(env, dev, th);
385
386         return rc;
387 }
388
389 static int nodemap_idx_delete(const struct lu_env *env,
390                               struct dt_object *idx,
391                               const struct nodemap_key *nk,
392                               const union nodemap_rec *unused)
393 {
394         struct thandle          *th;
395         struct dt_device        *dev = lu2dt_dev(idx->do_lu.lo_dev);
396         int                      rc = 0;
397
398         th = dt_trans_create(env, dev);
399
400         if (IS_ERR(th))
401                 GOTO(out, rc = PTR_ERR(th));
402
403         rc = dt_declare_delete(env, idx, (const struct dt_key *)nk, th);
404         if (rc != 0)
405                 GOTO(out, rc);
406
407         rc = dt_declare_version_set(env, idx, th);
408         if (rc != 0)
409                 GOTO(out, rc);
410
411         rc = dt_trans_start_local(env, dev, th);
412         if (rc != 0)
413                 GOTO(out, rc);
414
415         dt_write_lock(env, idx, 0);
416
417         rc = dt_delete(env, idx, (const struct dt_key *)nk, th);
418
419         nodemap_inc_version(env, idx, th);
420
421         dt_write_unlock(env, idx);
422 out:
423         dt_trans_stop(env, dev, th);
424
425         return rc;
426 }
427
428 enum nm_add_update {
429         NM_ADD = 0,
430         NM_UPDATE = 1,
431 };
432
433 static int nodemap_idx_cluster_add_update(const struct lu_nodemap *nodemap,
434                                           struct dt_object *idx,
435                                           enum nm_add_update update,
436                                           enum nodemap_cluster_rec_subid subid)
437 {
438         struct nodemap_key nk;
439         union nodemap_rec nr;
440         struct lu_env env;
441         int rc = 0;
442
443         ENTRY;
444
445         if (idx == NULL) {
446                 if (!nodemap_mgs()) {
447                         CERROR("cannot add nodemap config to non-existing MGS.\n");
448                         return -EINVAL;
449                 }
450                 idx = nodemap_mgs_ncf->ncf_obj;
451         }
452
453         rc = lu_env_init(&env, LCT_LOCAL);
454         if (rc)
455                 RETURN(rc);
456
457         nodemap_cluster_key_init(&nk, nodemap->nm_id, subid);
458         switch (subid) {
459         case NODEMAP_CLUSTER_REC:
460                 nodemap_cluster_rec_init(&nr, nodemap);
461                 break;
462         case NODEMAP_CLUSTER_ROLES:
463                 nodemap_cluster_roles_rec_init(&nr, nodemap);
464                 break;
465         default:
466                 CWARN("%s: unknown subtype %u\n", nodemap->nm_name, subid);
467                 GOTO(fini, rc = -EINVAL);
468         }
469
470         if (update == NM_UPDATE)
471                 rc = nodemap_idx_update(&env, idx, &nk, &nr);
472         else
473                 rc = nodemap_idx_insert(&env, idx, &nk, &nr);
474
475 fini:
476         lu_env_fini(&env);
477         RETURN(rc);
478 }
479
480 int nodemap_idx_nodemap_add(const struct lu_nodemap *nodemap)
481 {
482         return nodemap_idx_cluster_add_update(nodemap, NULL,
483                                               NM_ADD, NODEMAP_CLUSTER_REC);
484 }
485
486 int nodemap_idx_nodemap_update(const struct lu_nodemap *nodemap)
487 {
488         return nodemap_idx_cluster_add_update(nodemap, NULL,
489                                               NM_UPDATE, NODEMAP_CLUSTER_REC);
490 }
491
492 int nodemap_idx_nodemap_del(const struct lu_nodemap *nodemap)
493 {
494         struct rb_root           root;
495         struct lu_idmap         *idmap;
496         struct lu_idmap         *temp;
497         struct lu_nid_range     *range;
498         struct lu_nid_range     *range_temp;
499         struct nodemap_key       nk;
500         struct lu_env            env;
501         int                      rc = 0;
502         int                      rc2 = 0;
503
504         ENTRY;
505         if (!nodemap_mgs()) {
506                 CERROR("cannot add nodemap config to non-existing MGS.\n");
507                 return -EINVAL;
508         }
509
510         rc = lu_env_init(&env, LCT_LOCAL);
511         if (rc != 0)
512                 RETURN(rc);
513
514         nodemap_cluster_key_init(&nk, nodemap->nm_id, NODEMAP_CLUSTER_ROLES);
515         rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
516         if (rc2 < 0 && rc2 != -ENOENT)
517                 rc = rc2;
518
519         root = nodemap->nm_fs_to_client_uidmap;
520         rbtree_postorder_for_each_entry_safe(idmap, temp, &root,
521                                              id_fs_to_client) {
522                 nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_UID,
523                                        idmap->id_client);
524                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
525                                          &nk, NULL);
526                 if (rc2 < 0)
527                         rc = rc2;
528         }
529
530         root = nodemap->nm_client_to_fs_gidmap;
531         rbtree_postorder_for_each_entry_safe(idmap, temp, &root,
532                                              id_client_to_fs) {
533                 nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_GID,
534                                        idmap->id_client);
535                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
536                                          &nk, NULL);
537                 if (rc2 < 0)
538                         rc = rc2;
539         }
540
541         root = nodemap->nm_client_to_fs_projidmap;
542         rbtree_postorder_for_each_entry_safe(idmap, temp, &root,
543                                              id_client_to_fs) {
544                 nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_PROJID,
545                                        idmap->id_client);
546                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
547                                          &nk, NULL);
548                 if (rc2 < 0)
549                         rc = rc2;
550         }
551
552         list_for_each_entry_safe(range, range_temp, &nodemap->nm_ranges,
553                                  rn_list) {
554                 enum nodemap_idx_type type;
555
556                 type = range->rn_netmask ? NODEMAP_NID_MASK_IDX :
557                                            NODEMAP_RANGE_IDX;
558                 nodemap_range_key_init(&nk, type, nodemap->nm_id, range->rn_id);
559                 rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj,
560                                          &nk, NULL);
561                 if (rc2 < 0)
562                         rc = rc2;
563         }
564
565         nodemap_cluster_key_init(&nk, nodemap->nm_id, NODEMAP_CLUSTER_REC);
566         rc2 = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
567         if (rc2 < 0)
568                 rc = rc2;
569
570         lu_env_fini(&env);
571
572         RETURN(rc);
573 }
574
575 int nodemap_idx_cluster_roles_add(const struct lu_nodemap *nodemap)
576 {
577         return nodemap_idx_cluster_add_update(nodemap, NULL, NM_ADD,
578                                               NODEMAP_CLUSTER_ROLES);
579 }
580
581 int nodemap_idx_cluster_roles_update(const struct lu_nodemap *nodemap)
582 {
583         return nodemap_idx_cluster_add_update(nodemap, NULL, NM_UPDATE,
584                                               NODEMAP_CLUSTER_ROLES);
585 }
586
587 int nodemap_idx_cluster_roles_del(const struct lu_nodemap *nodemap)
588 {
589         struct nodemap_key nk;
590         struct lu_env env;
591         int rc = 0;
592
593         ENTRY;
594
595         if (!nodemap_mgs()) {
596                 CERROR("cannot add nodemap config to non-existing MGS.\n");
597                 return -EINVAL;
598         }
599
600         rc = lu_env_init(&env, LCT_LOCAL);
601         if (rc != 0)
602                 RETURN(rc);
603
604         nodemap_cluster_key_init(&nk, nodemap->nm_id, NODEMAP_CLUSTER_ROLES);
605         rc = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
606
607         lu_env_fini(&env);
608         RETURN(rc);
609 }
610
611 int nodemap_idx_range_add(const struct lu_nid_range *range)
612 {
613         struct nodemap_key nk;
614         union nodemap_rec nr;
615         struct lu_env env;
616         int rc = 0;
617
618         ENTRY;
619         if (!nodemap_mgs()) {
620                 CERROR("cannot add nodemap config to non-existing MGS.\n");
621                 return -EINVAL;
622         }
623
624         rc = lu_env_init(&env, LCT_LOCAL);
625         if (rc != 0)
626                 RETURN(rc);
627
628         nodemap_range_key_init(&nk, range->rn_netmask ? NODEMAP_NID_MASK_IDX :
629                                                         NODEMAP_RANGE_IDX,
630                                range->rn_nodemap->nm_id, range->rn_id);
631         rc = nodemap_range_rec_init(&nr, range);
632         if (rc < 0)
633                 goto free_env;
634
635         rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj, &nk, &nr);
636 free_env:
637         lu_env_fini(&env);
638
639         RETURN(rc);
640 }
641
642 int nodemap_idx_range_del(const struct lu_nid_range *range)
643 {
644         struct nodemap_key       nk;
645         struct lu_env            env;
646         int                      rc = 0;
647         ENTRY;
648
649         if (!nodemap_mgs()) {
650                 CERROR("cannot del nodemap config from non-existing MGS.\n");
651                 return -EINVAL;
652         }
653
654         rc = lu_env_init(&env, LCT_LOCAL);
655         if (rc != 0)
656                 RETURN(rc);
657
658         nodemap_range_key_init(&nk, range->rn_netmask ? NODEMAP_NID_MASK_IDX :
659                                                         NODEMAP_RANGE_IDX,
660                                range->rn_nodemap->nm_id, range->rn_id);
661         rc = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
662         lu_env_fini(&env);
663
664         RETURN(rc);
665 }
666
667 int nodemap_idx_idmap_add(const struct lu_nodemap *nodemap,
668                           enum nodemap_id_type id_type,
669                           const u32 map[2])
670 {
671         struct nodemap_key       nk;
672         union nodemap_rec        nr;
673         struct lu_env            env;
674         int                      rc = 0;
675         ENTRY;
676
677         if (!nodemap_mgs()) {
678                 CERROR("cannot add nodemap config to non-existing MGS.\n");
679                 return -EINVAL;
680         }
681
682         rc = lu_env_init(&env, LCT_LOCAL);
683         if (rc != 0)
684                 RETURN(rc);
685
686         nodemap_idmap_key_init(&nk, nodemap->nm_id, id_type, map[0]);
687         nodemap_idmap_rec_init(&nr, map[1]);
688
689         rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj, &nk, &nr);
690         lu_env_fini(&env);
691
692         RETURN(rc);
693 }
694
695 int nodemap_idx_idmap_del(const struct lu_nodemap *nodemap,
696                           enum nodemap_id_type id_type,
697                           const u32 map[2])
698 {
699         struct nodemap_key       nk;
700         struct lu_env            env;
701         int                      rc = 0;
702         ENTRY;
703
704         if (!nodemap_mgs()) {
705                 CERROR("cannot add nodemap config to non-existing MGS.\n");
706                 return -EINVAL;
707         }
708
709         rc = lu_env_init(&env, LCT_LOCAL);
710         if (rc != 0)
711                 RETURN(rc);
712
713         nodemap_idmap_key_init(&nk, nodemap->nm_id, id_type, map[0]);
714
715         rc = nodemap_idx_delete(&env, nodemap_mgs_ncf->ncf_obj, &nk, NULL);
716         lu_env_fini(&env);
717
718         RETURN(rc);
719 }
720
721 static int nodemap_idx_global_add_update(bool value, enum nm_add_update update)
722 {
723         struct nodemap_key       nk;
724         union nodemap_rec        nr;
725         struct lu_env            env;
726         int                      rc = 0;
727         ENTRY;
728
729         if (!nodemap_mgs()) {
730                 CERROR("cannot add nodemap config to non-existing MGS.\n");
731                 return -EINVAL;
732         }
733
734         rc = lu_env_init(&env, LCT_LOCAL);
735         if (rc != 0)
736                 RETURN(rc);
737
738         nodemap_global_key_init(&nk);
739         nodemap_global_rec_init(&nr, value);
740
741         if (update == NM_UPDATE)
742                 rc = nodemap_idx_update(&env, nodemap_mgs_ncf->ncf_obj,
743                                         &nk, &nr);
744         else
745                 rc = nodemap_idx_insert(&env, nodemap_mgs_ncf->ncf_obj,
746                                         &nk, &nr);
747
748         lu_env_fini(&env);
749
750         RETURN(rc);
751 }
752
753 int nodemap_idx_nodemap_activate(bool value)
754 {
755         return nodemap_idx_global_add_update(value, NM_UPDATE);
756 }
757
758 static enum nodemap_idx_type nodemap_get_key_type(const struct nodemap_key *key)
759 {
760         u32                      nodemap_id;
761
762         nodemap_id = le32_to_cpu(key->nk_nodemap_id);
763         return nm_idx_get_type(nodemap_id);
764 }
765
766 static int nodemap_get_key_subtype(const struct nodemap_key *key)
767 {
768         enum nodemap_idx_type type = nodemap_get_key_type(key);
769
770         return type == NODEMAP_CLUSTER_IDX ? key->nk_cluster_subid : -1;
771 }
772
773 static int nodemap_cluster_rec_helper(struct nodemap_config *config,
774                                       u32 nodemap_id,
775                                       const union nodemap_rec *rec,
776                                       struct lu_nodemap **recent_nodemap)
777 {
778         struct lu_nodemap *nodemap, *old_nm;
779         enum nm_flag_bits flags;
780         enum nm_flag2_bits flags2;
781
782         nodemap = cfs_hash_lookup(config->nmc_nodemap_hash, rec->ncr.ncr_name);
783         if (nodemap == NULL) {
784                 if (nodemap_id == LUSTRE_NODEMAP_DEFAULT_ID)
785                         nodemap = nodemap_create(rec->ncr.ncr_name, config, 1);
786                 else
787                         nodemap = nodemap_create(rec->ncr.ncr_name, config, 0);
788                 if (IS_ERR(nodemap))
789                         return PTR_ERR(nodemap);
790
791                 /* we need to override the local ID with the saved ID */
792                 nodemap->nm_id = nodemap_id;
793                 if (nodemap_id > config->nmc_nodemap_highest_id)
794                         config->nmc_nodemap_highest_id = nodemap_id;
795
796         } else if (nodemap->nm_id != nodemap_id) {
797                 nodemap_putref(nodemap);
798                 return -EINVAL;
799         }
800
801         nodemap->nm_squash_uid = le32_to_cpu(rec->ncr.ncr_squash_uid);
802         nodemap->nm_squash_gid = le32_to_cpu(rec->ncr.ncr_squash_gid);
803         nodemap->nm_squash_projid = le32_to_cpu(rec->ncr.ncr_squash_projid);
804
805         flags = rec->ncr.ncr_flags;
806         nodemap->nmf_allow_root_access = flags & NM_FL_ALLOW_ROOT_ACCESS;
807         nodemap->nmf_trust_client_ids = flags & NM_FL_TRUST_CLIENT_IDS;
808         nodemap->nmf_deny_unknown = flags & NM_FL_DENY_UNKNOWN;
809         nodemap->nmf_map_mode =
810                 (flags & NM_FL_MAP_UID ? NODEMAP_MAP_UID : 0) |
811                 (flags & NM_FL_MAP_GID ? NODEMAP_MAP_GID : 0) |
812                 (flags & NM_FL_MAP_PROJID ? NODEMAP_MAP_PROJID : 0);
813         if (nodemap->nmf_map_mode == NODEMAP_MAP_BOTH_LEGACY)
814                 nodemap->nmf_map_mode = NODEMAP_MAP_BOTH;
815         nodemap->nmf_enable_audit = flags & NM_FL_ENABLE_AUDIT;
816         nodemap->nmf_forbid_encryption = flags & NM_FL_FORBID_ENCRYPT;
817         flags2 = rec->ncr.ncr_flags2;
818         nodemap->nmf_readonly_mount = flags2 & NM_FL2_READONLY_MOUNT;
819         /* by default, and in the absence of cluster_roles, grant all roles */
820         nodemap->nmf_rbac = NODEMAP_RBAC_ALL;
821
822         /* The fileset should be saved otherwise it will be empty
823          * every time in case of "NODEMAP_CLUSTER_IDX".
824          */
825         mutex_lock(&active_config_lock);
826         old_nm = nodemap_lookup(rec->ncr.ncr_name);
827         if (!IS_ERR(old_nm) && old_nm->nm_fileset[0] != '\0')
828                 strscpy(nodemap->nm_fileset, old_nm->nm_fileset,
829                         sizeof(nodemap->nm_fileset));
830         mutex_unlock(&active_config_lock);
831         if (!IS_ERR(old_nm))
832                 nodemap_putref(old_nm);
833
834         if (*recent_nodemap == NULL) {
835                 *recent_nodemap = nodemap;
836                 INIT_LIST_HEAD(&nodemap->nm_list);
837         } else {
838                 list_add(&nodemap->nm_list, &(*recent_nodemap)->nm_list);
839         }
840         nodemap_putref(nodemap);
841
842         return 0;
843 }
844
845 static int nodemap_cluster_roles_helper(struct lu_nodemap *nodemap,
846                                         const union nodemap_rec *rec)
847 {
848         nodemap->nmf_rbac = le64_to_cpu(rec->ncrr.ncrr_roles);
849
850         return 0;
851 }
852
853 /**
854  * Process a key/rec pair and modify the new configuration.
855  *
856  * \param       config          configuration to update with this key/rec data
857  * \param       key             key of the record that was loaded
858  * \param       rec             record that was loaded
859  * \param       recent_nodemap  last referenced nodemap
860  * \retval      type of record processed, see enum #nodemap_idx_type
861  * \retval      -ENOENT         range or map loaded before nodemap record
862  * \retval      -EINVAL         duplicate nodemap cluster records found with
863  *                              different IDs, or nodemap has invalid name
864  * \retval      -ENOMEM
865  */
866 static int nodemap_process_keyrec(struct nodemap_config *config,
867                                   const struct nodemap_key *key,
868                                   const union nodemap_rec *rec,
869                                   struct lu_nodemap **recent_nodemap)
870 {
871         struct lu_nodemap *nodemap = NULL;
872         enum nodemap_idx_type type;
873         enum nodemap_id_type id_type;
874         struct lnet_nid nid[2];
875         int subtype;
876         u32 nodemap_id;
877         u32 map[2];
878         int rc;
879
880         ENTRY;
881
882         BUILD_BUG_ON(sizeof(union nodemap_rec) != 32);
883
884         nodemap_id = le32_to_cpu(key->nk_nodemap_id);
885         type = nodemap_get_key_type(key);
886         subtype = nodemap_get_key_subtype(key);
887         nodemap_id = nm_idx_set_type(nodemap_id, 0);
888
889         CDEBUG(D_INFO, "found config entry, nm_id %d type %d subtype %d\n",
890                nodemap_id, type, subtype);
891
892         /* find the correct nodemap in the load list */
893         if (type == NODEMAP_RANGE_IDX || type == NODEMAP_NID_MASK_IDX ||
894             type == NODEMAP_UIDMAP_IDX || type == NODEMAP_GIDMAP_IDX ||
895             type == NODEMAP_PROJIDMAP_IDX ||
896             (type == NODEMAP_CLUSTER_IDX && subtype != NODEMAP_CLUSTER_REC)) {
897                 struct lu_nodemap *tmp = NULL;
898
899                 nodemap = *recent_nodemap;
900
901                 if (nodemap == NULL)
902                         GOTO(out, rc = -ENOENT);
903
904                 if (nodemap->nm_id != nodemap_id) {
905                         list_for_each_entry(tmp, &nodemap->nm_list, nm_list)
906                                 if (tmp->nm_id == nodemap_id) {
907                                         nodemap = tmp;
908                                         break;
909                                 }
910
911                         if (nodemap->nm_id != nodemap_id)
912                                 GOTO(out, rc = -ENOENT);
913                 }
914
915                 /* update most recently used nodemap if necessay */
916                 if (nodemap != *recent_nodemap)
917                         *recent_nodemap = nodemap;
918         }
919
920         switch (type) {
921         case NODEMAP_EMPTY_IDX:
922                 if (nodemap_id != 0)
923                         CWARN("Found nodemap config record without type field, "
924                               " nodemap_id=%d. nodemap config file corrupt?\n",
925                               nodemap_id);
926                 break;
927         case NODEMAP_CLUSTER_IDX:
928                 switch (nodemap_get_key_subtype(key)) {
929                 case NODEMAP_CLUSTER_REC:
930                         rc = nodemap_cluster_rec_helper(config, nodemap_id, rec,
931                                                         recent_nodemap);
932                         if (rc != 0)
933                                 GOTO(out, rc);
934                         break;
935                 case NODEMAP_CLUSTER_ROLES:
936                         rc = nodemap_cluster_roles_helper(nodemap, rec);
937                         if (rc != 0)
938                                 GOTO(out, rc);
939                         break;
940                 default:
941                         CWARN("%s: ignoring keyrec of type %d with subtype %u\n",
942                               nodemap->nm_name, NODEMAP_CLUSTER_IDX,
943                               nodemap_get_key_subtype(key));
944                         break;
945                 }
946                 break;
947         case NODEMAP_RANGE_IDX:
948                 lnet_nid4_to_nid(le64_to_cpu(rec->nrr.nrr_start_nid), &nid[0]);
949                 lnet_nid4_to_nid(le64_to_cpu(rec->nrr.nrr_end_nid), &nid[1]);
950                 rc = nodemap_add_range_helper(config, nodemap, nid, 0,
951                                               le32_to_cpu(key->nk_range_id));
952                 if (rc != 0)
953                         GOTO(out, rc);
954                 break;
955         case NODEMAP_NID_MASK_IDX:
956                 nid[0] = rec->nrr2.nrr_nid_prefix;
957                 nid[1] = rec->nrr2.nrr_nid_prefix;
958                 rc = nodemap_add_range_helper(config, nodemap, nid,
959                                               rec->nrr2.nrr_netmask,
960                                               le32_to_cpu(key->nk_range_id));
961                 if (rc != 0)
962                         GOTO(out, rc);
963                 break;
964         case NODEMAP_UIDMAP_IDX:
965         case NODEMAP_GIDMAP_IDX:
966         case NODEMAP_PROJIDMAP_IDX:
967                 map[0] = le32_to_cpu(key->nk_id_client);
968                 map[1] = le32_to_cpu(rec->nir.nir_id_fs);
969
970                 if (type == NODEMAP_UIDMAP_IDX)
971                         id_type = NODEMAP_UID;
972                 else if (type == NODEMAP_GIDMAP_IDX)
973                         id_type = NODEMAP_GID;
974                 else if (type == NODEMAP_PROJIDMAP_IDX)
975                         id_type = NODEMAP_PROJID;
976                 else
977                         GOTO(out, rc = -EINVAL);
978
979                 rc = nodemap_add_idmap_helper(nodemap, id_type, map);
980                 if (rc != 0)
981                         GOTO(out, rc);
982                 break;
983         case NODEMAP_GLOBAL_IDX:
984                 switch (key->nk_unused) {
985                 case 0:
986                         config->nmc_nodemap_is_active = rec->ngr.ngr_is_active;
987                         break;
988                 default:
989                         CWARN("%s: ignoring keyrec of type %d with subtype %u\n",
990                               recent_nodemap ?
991                                (*recent_nodemap)->nm_name : "nodemap",
992                               NODEMAP_GLOBAL_IDX, key->nk_unused);
993                         break;
994                 }
995                 break;
996         default:
997                 CWARN("%s: ignoring key %u:%u for unknown type %u\n",
998                       recent_nodemap ? (*recent_nodemap)->nm_name : "nodemap",
999                       key->nk_nodemap_id & 0x0FFFFFFF, key->nk_unused, type);
1000                 break;
1001         }
1002
1003         rc = type;
1004
1005         EXIT;
1006
1007 out:
1008         return rc;
1009 }
1010
1011 enum nm_config_passes {
1012         NM_READ_CLUSTERS = 0,
1013         NM_READ_ATTRIBUTES = 1,
1014 };
1015
1016 static int nodemap_load_entries(const struct lu_env *env,
1017                                 struct dt_object *nodemap_idx)
1018 {
1019         const struct dt_it_ops *iops;
1020         struct dt_it *it;
1021         struct lu_nodemap *recent_nodemap = NULL;
1022         struct nodemap_config *new_config = NULL;
1023         u64 hash = 0;
1024         bool activate_nodemap = false;
1025         bool loaded_global_idx = false;
1026         enum nm_config_passes cur_pass = NM_READ_CLUSTERS;
1027         int rc = 0;
1028
1029         ENTRY;
1030
1031         iops = &nodemap_idx->do_index_ops->dio_it;
1032
1033         dt_read_lock(env, nodemap_idx, 0);
1034         it = iops->init(env, nodemap_idx, 0);
1035         if (IS_ERR(it))
1036                 GOTO(out, rc = PTR_ERR(it));
1037
1038         rc = iops->load(env, it, hash);
1039         if (rc < 0)
1040                 GOTO(out_iops_fini, rc);
1041
1042         /* rc == 0 means we need to advance to record */
1043         if (rc == 0) {
1044                 rc = iops->next(env, it);
1045
1046                 if (rc < 0)
1047                         GOTO(out_iops_put, rc);
1048                 /* rc > 0 is eof, will be checked in while below */
1049         } else {
1050                 /* rc == 1, we found initial record and can process below */
1051                 rc = 0;
1052         }
1053
1054         new_config = nodemap_config_alloc();
1055         if (IS_ERR(new_config)) {
1056                 rc = PTR_ERR(new_config);
1057                 new_config = NULL;
1058                 GOTO(out_iops_put, rc);
1059         }
1060
1061         /* rc > 0 is eof, check initial iops->next here as well */
1062         while (rc == 0) {
1063                 struct nodemap_key *key;
1064                 union nodemap_rec rec;
1065                 enum nodemap_idx_type key_type;
1066                 int sub_type;
1067
1068                 key = (struct nodemap_key *)iops->key(env, it);
1069                 key_type = nodemap_get_key_type((struct nodemap_key *)key);
1070                 sub_type = nodemap_get_key_subtype((struct nodemap_key *)key);
1071                 if ((cur_pass == NM_READ_CLUSTERS &&
1072                      key_type == NODEMAP_CLUSTER_IDX &&
1073                      sub_type == NODEMAP_CLUSTER_REC) ||
1074                     (cur_pass == NM_READ_ATTRIBUTES &&
1075                      (key_type != NODEMAP_CLUSTER_IDX ||
1076                       sub_type != NODEMAP_CLUSTER_REC) &&
1077                      key_type != NODEMAP_EMPTY_IDX)) {
1078                         rc = iops->rec(env, it, (struct dt_rec *)&rec, 0);
1079                         if (rc != -ESTALE) {
1080                                 if (rc != 0)
1081                                         GOTO(out_nodemap_config, rc);
1082                                 rc = nodemap_process_keyrec(new_config, key, &rec,
1083                                                             &recent_nodemap);
1084                                 if (rc < 0)
1085                                         GOTO(out_nodemap_config, rc);
1086                                 if (rc == NODEMAP_GLOBAL_IDX)
1087                                         loaded_global_idx = true;
1088                         }
1089                 }
1090
1091                 do
1092                         rc = iops->next(env, it);
1093                 while (rc == -ESTALE);
1094
1095                 /* move to second pass */
1096                 if (rc > 0 && cur_pass == NM_READ_CLUSTERS) {
1097                         cur_pass = NM_READ_ATTRIBUTES;
1098                         rc = iops->load(env, it, 0);
1099                         if (rc == 0)
1100                                 rc = iops->next(env, it);
1101                         else if (rc > 0)
1102                                 rc = 0;
1103                         else
1104                                 GOTO(out, rc);
1105                 }
1106         }
1107
1108         if (rc > 0)
1109                 rc = 0;
1110
1111 out_nodemap_config:
1112         if (rc != 0)
1113                 nodemap_config_dealloc(new_config);
1114         else
1115                 /* creating new default needs to be done outside dt read lock */
1116                 activate_nodemap = true;
1117 out_iops_put:
1118         iops->put(env, it);
1119 out_iops_fini:
1120         iops->fini(env, it);
1121 out:
1122         dt_read_unlock(env, nodemap_idx);
1123
1124         if (rc != 0)
1125                 CWARN("%s: failed to load nodemap configuration: rc = %d\n",
1126                       nodemap_idx->do_lu.lo_dev->ld_obd->obd_name, rc);
1127
1128         if (!activate_nodemap)
1129                 RETURN(rc);
1130
1131         if (new_config->nmc_default_nodemap == NULL) {
1132                 /* new MGS won't have a default nm on disk, so create it here */
1133                 struct lu_nodemap *nodemap =
1134                         nodemap_create(DEFAULT_NODEMAP, new_config, 1);
1135                 if (IS_ERR(nodemap)) {
1136                         rc = PTR_ERR(nodemap);
1137                 } else {
1138                         rc = nodemap_idx_cluster_add_update(
1139                                         new_config->nmc_default_nodemap,
1140                                         nodemap_idx,
1141                                         NM_ADD, NODEMAP_CLUSTER_REC);
1142                         nodemap_putref(new_config->nmc_default_nodemap);
1143                 }
1144         }
1145
1146         /* new nodemap config won't have an active/inactive record */
1147         if (rc == 0 && loaded_global_idx == false) {
1148                 struct nodemap_key       nk;
1149                 union nodemap_rec        nr;
1150
1151                 nodemap_global_key_init(&nk);
1152                 nodemap_global_rec_init(&nr, false);
1153                 rc = nodemap_idx_insert(env, nodemap_idx, &nk, &nr);
1154         }
1155
1156         if (rc == 0)
1157                 nodemap_config_set_active(new_config);
1158         else
1159                 nodemap_config_dealloc(new_config);
1160
1161         RETURN(rc);
1162 }
1163
1164 /**
1165  * Step through active config and write to disk.
1166  */
1167 static struct dt_object *
1168 nodemap_save_config_cache(const struct lu_env *env,
1169                           struct dt_device *dev,
1170                           struct local_oid_storage *los)
1171 {
1172         struct dt_object *o;
1173         struct lu_nodemap *nodemap;
1174         struct lu_nodemap *nm_tmp;
1175         struct lu_nid_range *range;
1176         struct lu_nid_range *range_temp;
1177         struct lu_idmap *idmap;
1178         struct lu_idmap *id_tmp;
1179         struct rb_root root;
1180         struct nodemap_key nk;
1181         union nodemap_rec nr;
1182         LIST_HEAD(nodemap_list_head);
1183         int rc = 0, rc2;
1184
1185         ENTRY;
1186
1187         /* create a new index file to fill with active config */
1188         o = nodemap_cache_find_create(env, dev, los, NCFC_CREATE_NEW);
1189         if (IS_ERR(o))
1190                 RETURN(o);
1191
1192         mutex_lock(&active_config_lock);
1193
1194         /* convert hash to list so we don't spin */
1195         cfs_hash_for_each_safe(active_config->nmc_nodemap_hash,
1196                                nm_hash_list_cb, &nodemap_list_head);
1197
1198         list_for_each_entry_safe(nodemap, nm_tmp, &nodemap_list_head, nm_list) {
1199                 nodemap_cluster_key_init(&nk, nodemap->nm_id,
1200                                          NODEMAP_CLUSTER_REC);
1201                 nodemap_cluster_rec_init(&nr, nodemap);
1202
1203                 rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1204                 if (rc2 < 0) {
1205                         rc = rc2;
1206                         continue;
1207                 }
1208
1209                 /* only insert NODEMAP_CLUSTER_ROLES idx in saved config cache
1210                  * if nmf_rbac is not default value NODEMAP_RBAC_ALL
1211                  */
1212                 if (nodemap->nmf_rbac != NODEMAP_RBAC_ALL) {
1213                         nodemap_cluster_key_init(&nk, nodemap->nm_id,
1214                                                  NODEMAP_CLUSTER_ROLES);
1215                         nodemap_cluster_roles_rec_init(&nr, nodemap);
1216                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1217                         if (rc2 < 0)
1218                                 rc = rc2;
1219                 }
1220
1221                 down_read(&active_config->nmc_range_tree_lock);
1222                 list_for_each_entry_safe(range, range_temp, &nodemap->nm_ranges,
1223                                          rn_list) {
1224                         enum nodemap_idx_type type;
1225
1226                         type = range->rn_netmask ? NODEMAP_NID_MASK_IDX :
1227                                                    NODEMAP_RANGE_IDX;
1228                         nodemap_range_key_init(&nk, type, nodemap->nm_id,
1229                                                range->rn_id);
1230                         rc2 = nodemap_range_rec_init(&nr, range);
1231                         if (rc2 < 0) {
1232                                 rc = rc2;
1233                                 continue;
1234                         }
1235                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1236                         if (rc2 < 0)
1237                                 rc = rc2;
1238                 }
1239                 up_read(&active_config->nmc_range_tree_lock);
1240
1241                 /* we don't need to take nm_idmap_lock because active config
1242                  * lock prevents changes from happening to nodemaps
1243                  */
1244                 root = nodemap->nm_client_to_fs_uidmap;
1245                 rbtree_postorder_for_each_entry_safe(idmap, id_tmp, &root,
1246                                                      id_client_to_fs) {
1247                         nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_UID,
1248                                                idmap->id_client);
1249                         nodemap_idmap_rec_init(&nr, idmap->id_fs);
1250                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1251                         if (rc2 < 0)
1252                                 rc = rc2;
1253                 }
1254
1255                 root = nodemap->nm_client_to_fs_gidmap;
1256                 rbtree_postorder_for_each_entry_safe(idmap, id_tmp, &root,
1257                                                      id_client_to_fs) {
1258                         nodemap_idmap_key_init(&nk, nodemap->nm_id, NODEMAP_GID,
1259                                                idmap->id_client);
1260                         nodemap_idmap_rec_init(&nr, idmap->id_fs);
1261                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1262                         if (rc2 < 0)
1263                                 rc = rc2;
1264                 }
1265
1266                 root = nodemap->nm_client_to_fs_projidmap;
1267                 rbtree_postorder_for_each_entry_safe(idmap, id_tmp, &root,
1268                                                      id_client_to_fs) {
1269                         nodemap_idmap_key_init(&nk, nodemap->nm_id,
1270                                                NODEMAP_PROJID,
1271                                                idmap->id_client);
1272                         nodemap_idmap_rec_init(&nr, idmap->id_fs);
1273                         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1274                         if (rc2 < 0)
1275                                 rc = rc2;
1276                 }
1277         }
1278         nodemap_global_key_init(&nk);
1279         nodemap_global_rec_init(&nr, active_config->nmc_nodemap_is_active);
1280         rc2 = nodemap_idx_insert(env, o, &nk, &nr);
1281         if (rc2 < 0)
1282                 rc = rc2;
1283
1284         mutex_unlock(&active_config_lock);
1285
1286         if (rc < 0) {
1287                 dt_object_put(env, o);
1288                 o = ERR_PTR(rc);
1289         }
1290
1291         RETURN(o);
1292 }
1293
1294 static void nodemap_save_all_caches(void)
1295 {
1296         struct nm_config_file   *ncf;
1297         struct lu_env            env;
1298         int                      rc = 0;
1299
1300         /* recreating nodemap cache requires fld_thread_key be in env */
1301         rc = lu_env_init(&env, LCT_MD_THREAD | LCT_DT_THREAD | LCT_MG_THREAD);
1302         if (rc != 0) {
1303                 CWARN("cannot init env for nodemap config: rc = %d\n", rc);
1304                 return;
1305         }
1306
1307         mutex_lock(&ncf_list_lock);
1308         list_for_each_entry(ncf, &ncf_list_head, ncf_list) {
1309                 struct dt_device *dev = lu2dt_dev(ncf->ncf_obj->do_lu.lo_dev);
1310                 struct obd_device *obd = ncf->ncf_obj->do_lu.lo_dev->ld_obd;
1311                 struct dt_object *o;
1312
1313                 /* put current config file so save conf can rewrite it */
1314                 dt_object_put_nocache(&env, ncf->ncf_obj);
1315                 ncf->ncf_obj = NULL;
1316
1317                 o = nodemap_save_config_cache(&env, dev, ncf->ncf_los);
1318                 if (IS_ERR(o))
1319                         CWARN("%s: error writing to nodemap config: rc = %d\n",
1320                               obd->obd_name, rc);
1321                 else
1322                         ncf->ncf_obj = o;
1323         }
1324         mutex_unlock(&ncf_list_lock);
1325
1326         lu_env_fini(&env);
1327 }
1328
1329 /* tracks if config still needs to be loaded, either from disk or network */
1330 /*  0: not loaded yet
1331  *  1: successfully loaded
1332  * -1: loading in progress
1333  */
1334 static int nodemap_config_loaded;
1335 static DEFINE_MUTEX(nodemap_config_loaded_lock);
1336
1337 bool nodemap_loading(void)
1338 {
1339         return (nodemap_config_loaded == -1);
1340 }
1341
1342 void nodemap_config_set_loading_mgc(bool loading)
1343 {
1344         mutex_lock(&nodemap_config_loaded_lock);
1345         nodemap_config_loaded = loading ? -1 : 0;
1346         mutex_unlock(&nodemap_config_loaded_lock);
1347 }
1348 EXPORT_SYMBOL(nodemap_config_set_loading_mgc);
1349
1350 /**
1351  * Ensures that configs loaded over the wire are prioritized over those loaded
1352  * from disk.
1353  *
1354  * \param config        config to set as the active config
1355  */
1356 void nodemap_config_set_active_mgc(struct nodemap_config *config)
1357 {
1358         mutex_lock(&nodemap_config_loaded_lock);
1359         nodemap_config_set_active(config);
1360         nodemap_config_loaded = 1;
1361         nodemap_save_all_caches();
1362         mutex_unlock(&nodemap_config_loaded_lock);
1363 }
1364 EXPORT_SYMBOL(nodemap_config_set_active_mgc);
1365
1366 /**
1367  * Register a dt_object representing the config index file. This should be
1368  * called by targets in order to load the nodemap configuration from disk. The
1369  * dt_object should be created with local_index_find_or_create and the index
1370  * features should be enabled with do_index_try.
1371  *
1372  * \param obj   dt_object returned by local_index_find_or_create
1373  *
1374  * \retval      on success: nm_config_file handle for later deregistration
1375  * \retval      -ENOMEM         memory allocation failure
1376  * \retval      -ENOENT         error loading nodemap config
1377  * \retval      -EINVAL         error loading nodemap config
1378  * \retval      -EEXIST         nodemap config already registered for MGS
1379  */
1380 struct nm_config_file *nm_config_file_register_mgs(const struct lu_env *env,
1381                                                    struct dt_object *obj,
1382                                                    struct local_oid_storage *los)
1383 {
1384         struct nm_config_file *ncf;
1385         int rc = 0;
1386         ENTRY;
1387
1388         if (nodemap_mgs())
1389                 GOTO(out, ncf = ERR_PTR(-EEXIST));
1390
1391         OBD_ALLOC_PTR(ncf);
1392         if (ncf == NULL)
1393                 GOTO(out, ncf = ERR_PTR(-ENOMEM));
1394
1395         /* if loading from cache, prevent activation of MGS config until cache
1396          * loading is done, so disk config is overwritten by MGS config.
1397          */
1398         mutex_lock(&nodemap_config_loaded_lock);
1399         nodemap_config_loaded = -1;
1400         rc = nodemap_load_entries(env, obj);
1401         nodemap_config_loaded = !rc;
1402         mutex_unlock(&nodemap_config_loaded_lock);
1403
1404         if (rc) {
1405                 OBD_FREE_PTR(ncf);
1406                 GOTO(out, ncf = ERR_PTR(rc));
1407         }
1408
1409         lu_object_get(&obj->do_lu);
1410
1411         ncf->ncf_obj = obj;
1412         ncf->ncf_los = los;
1413
1414         nodemap_mgs_ncf = ncf;
1415
1416 out:
1417         return ncf;
1418 }
1419 EXPORT_SYMBOL(nm_config_file_register_mgs);
1420
1421 struct nm_config_file *nm_config_file_register_tgt(const struct lu_env *env,
1422                                                    struct dt_device *dev,
1423                                                    struct local_oid_storage *los)
1424 {
1425         struct nm_config_file *ncf;
1426         struct dt_object *config_obj = NULL;
1427         int rc = 0;
1428
1429         OBD_ALLOC_PTR(ncf);
1430         if (ncf == NULL)
1431                 RETURN(ERR_PTR(-ENOMEM));
1432
1433         /* don't load from cache if config already loaded */
1434         mutex_lock(&nodemap_config_loaded_lock);
1435         if (nodemap_config_loaded < 1) {
1436                 config_obj = nodemap_cache_find_create(env, dev, los, 0);
1437                 if (IS_ERR(config_obj)) {
1438                         rc = PTR_ERR(config_obj);
1439                 } else {
1440                         nodemap_config_loaded = -1;
1441                         rc = nodemap_load_entries(env, config_obj);
1442                 }
1443                 nodemap_config_loaded = !rc;
1444         }
1445         mutex_unlock(&nodemap_config_loaded_lock);
1446         if (rc)
1447                 GOTO(out_ncf, rc);
1448
1449         /* sync on disk caches w/ loaded config in memory, ncf_obj may change */
1450         if (!config_obj) {
1451                 config_obj = nodemap_save_config_cache(env, dev, los);
1452                 if (IS_ERR(config_obj))
1453                         GOTO(out_ncf, rc = PTR_ERR(config_obj));
1454         }
1455
1456         ncf->ncf_obj = config_obj;
1457         ncf->ncf_los = los;
1458
1459         mutex_lock(&ncf_list_lock);
1460         list_add(&ncf->ncf_list, &ncf_list_head);
1461         mutex_unlock(&ncf_list_lock);
1462
1463 out_ncf:
1464         if (rc) {
1465                 OBD_FREE_PTR(ncf);
1466                 RETURN(ERR_PTR(rc));
1467         }
1468
1469         RETURN(ncf);
1470 }
1471 EXPORT_SYMBOL(nm_config_file_register_tgt);
1472
1473 /**
1474  * Deregister a nm_config_file. Should be called by targets during cleanup.
1475  *
1476  * \param ncf   config file to deregister
1477  */
1478 void nm_config_file_deregister_mgs(const struct lu_env *env,
1479                                    struct nm_config_file *ncf)
1480 {
1481         ENTRY;
1482         LASSERT(nodemap_mgs_ncf == ncf);
1483
1484         nodemap_mgs_ncf = NULL;
1485         if (ncf->ncf_obj)
1486                 dt_object_put(env, ncf->ncf_obj);
1487
1488         OBD_FREE_PTR(ncf);
1489
1490         EXIT;
1491 }
1492 EXPORT_SYMBOL(nm_config_file_deregister_mgs);
1493
1494 void nm_config_file_deregister_tgt(const struct lu_env *env,
1495                                    struct nm_config_file *ncf)
1496 {
1497         ENTRY;
1498
1499         if (ncf == NULL)
1500                 return;
1501
1502         mutex_lock(&ncf_list_lock);
1503         list_del(&ncf->ncf_list);
1504         mutex_unlock(&ncf_list_lock);
1505
1506         if (ncf->ncf_obj)
1507                 dt_object_put(env, ncf->ncf_obj);
1508
1509         OBD_FREE_PTR(ncf);
1510
1511         EXIT;
1512 }
1513 EXPORT_SYMBOL(nm_config_file_deregister_tgt);
1514
1515 int nodemap_process_idx_pages(struct nodemap_config *config, union lu_page *lip,
1516                               struct lu_nodemap **recent_nodemap)
1517 {
1518         struct nodemap_key *key;
1519         union nodemap_rec *rec;
1520         char *entry;
1521         int j;
1522         int k;
1523         int rc = 0;
1524         int size = dt_nodemap_features.dif_keysize_max +
1525                    dt_nodemap_features.dif_recsize_max;
1526         ENTRY;
1527
1528         for (j = 0; j < LU_PAGE_COUNT; j++) {
1529                 if (lip->lp_idx.lip_magic != LIP_MAGIC)
1530                         return -EINVAL;
1531
1532                 /* get and process keys and records from page */
1533                 for (k = 0; k < lip->lp_idx.lip_nr; k++) {
1534                         entry = lip->lp_idx.lip_entries + k * size;
1535                         key = (struct nodemap_key *)entry;
1536
1537                         entry += dt_nodemap_features.dif_keysize_max;
1538                         rec = (union nodemap_rec *)entry;
1539
1540                         rc = nodemap_process_keyrec(config, key, rec,
1541                                                     recent_nodemap);
1542                         if (rc < 0)
1543                                 return rc;
1544                 }
1545                 lip++;
1546         }
1547
1548         EXIT;
1549         return 0;
1550 }
1551 EXPORT_SYMBOL(nodemap_process_idx_pages);
1552
1553 static int nodemap_page_build(const struct lu_env *env, struct dt_object *obj,
1554                               union lu_page *lp, size_t bytes,
1555                               const struct dt_it_ops *iops,
1556                               struct dt_it *it, __u32 attr, void *arg)
1557 {
1558         struct idx_info *ii = (struct idx_info *)arg;
1559         struct lu_idxpage *lip = &lp->lp_idx;
1560         char *entry;
1561         size_t size = ii->ii_keysize + ii->ii_recsize;
1562         int rc;
1563         ENTRY;
1564
1565         if (bytes < LIP_HDR_SIZE)
1566                 return -EINVAL;
1567
1568         /* initialize the header of the new container */
1569         memset(lip, 0, LIP_HDR_SIZE);
1570         lip->lip_magic = LIP_MAGIC;
1571         bytes -= LIP_HDR_SIZE;
1572
1573         entry = lip->lip_entries;
1574         do {
1575                 char *tmp_entry = entry;
1576                 struct dt_key *key;
1577                 __u64 hash;
1578                 enum nodemap_idx_type key_type;
1579                 int sub_type;
1580
1581                 /* fetch 64-bit hash value */
1582                 hash = iops->store(env, it);
1583                 ii->ii_hash_end = hash;
1584
1585                 if (CFS_FAIL_CHECK(OBD_FAIL_OBD_IDX_READ_BREAK)) {
1586                         if (lip->lip_nr != 0)
1587                                 GOTO(out, rc = 0);
1588                 }
1589
1590                 if (bytes < size) {
1591                         if (lip->lip_nr == 0)
1592                                 GOTO(out, rc = -EINVAL);
1593                         GOTO(out, rc = 0);
1594                 }
1595
1596                 key = iops->key(env, it);
1597                 key_type = nodemap_get_key_type((struct nodemap_key *)key);
1598                 sub_type = nodemap_get_key_subtype((struct nodemap_key *)key);
1599
1600                 /* on the first pass, get only the cluster types. On second
1601                  * pass, get all the rest */
1602                 if ((ii->ii_attrs == NM_READ_CLUSTERS &&
1603                      key_type == NODEMAP_CLUSTER_IDX &&
1604                      sub_type == NODEMAP_CLUSTER_REC) ||
1605                     (ii->ii_attrs == NM_READ_ATTRIBUTES &&
1606                      (key_type != NODEMAP_CLUSTER_IDX ||
1607                       sub_type != NODEMAP_CLUSTER_REC) &&
1608                      key_type != NODEMAP_EMPTY_IDX)) {
1609                         memcpy(tmp_entry, key, ii->ii_keysize);
1610                         tmp_entry += ii->ii_keysize;
1611
1612                         /* and finally the record */
1613                         rc = iops->rec(env, it, (struct dt_rec *)tmp_entry,
1614                                        attr);
1615                         if (rc != -ESTALE) {
1616                                 if (rc != 0)
1617                                         GOTO(out, rc);
1618
1619                                 /* hash/key/record successfully copied! */
1620                                 lip->lip_nr++;
1621                                 if (unlikely(lip->lip_nr == 1 &&
1622                                     ii->ii_count == 0))
1623                                         ii->ii_hash_start = hash;
1624
1625                                 entry = tmp_entry + ii->ii_recsize;
1626                                 bytes -= size;
1627                         }
1628                 }
1629
1630                 /* move on to the next record */
1631                 do {
1632                         rc = iops->next(env, it);
1633                 } while (rc == -ESTALE);
1634
1635                 /* move to second pass */
1636                 if (rc > 0 && ii->ii_attrs == NM_READ_CLUSTERS) {
1637                         ii->ii_attrs = NM_READ_ATTRIBUTES;
1638                         rc = iops->load(env, it, 0);
1639                         if (rc == 0)
1640                                 rc = iops->next(env, it);
1641                         else if (rc > 0)
1642                                 rc = 0;
1643                         else
1644                                 GOTO(out, rc);
1645                 }
1646
1647         } while (rc == 0);
1648
1649         GOTO(out, rc);
1650 out:
1651         if (rc >= 0 && lip->lip_nr > 0)
1652                 /* one more container */
1653                 ii->ii_count++;
1654         if (rc > 0)
1655                 /* no more entries */
1656                 ii->ii_hash_end = II_END_OFF;
1657         return rc;
1658 }
1659
1660 int nodemap_index_read(struct lu_env *env, struct nm_config_file *ncf,
1661                        struct idx_info *ii, const struct lu_rdpg *rdpg)
1662 {
1663         struct dt_object        *nodemap_idx = ncf->ncf_obj;
1664         __u64                    version;
1665         int                      rc = 0;
1666
1667         ii->ii_keysize = dt_nodemap_features.dif_keysize_max;
1668         ii->ii_recsize = dt_nodemap_features.dif_recsize_max;
1669
1670         dt_read_lock(env, nodemap_idx, 0);
1671         version = dt_version_get(env, nodemap_idx);
1672         if (rdpg->rp_hash != 0 && ii->ii_version != version) {
1673                 CDEBUG(D_INFO, "nodemap config changed inflight, old %llu, new %llu\n",
1674                        ii->ii_version,
1675                        version);
1676                 ii->ii_hash_end = 0;
1677         } else {
1678                 rc = dt_index_walk(env, nodemap_idx, rdpg, nodemap_page_build,
1679                                    ii);
1680                 CDEBUG(D_INFO, "walked index, hashend %llx\n", ii->ii_hash_end);
1681         }
1682
1683         if (rc >= 0)
1684                 ii->ii_version = version;
1685
1686         /*
1687          * For partial lu_idxpage filling of the end system page,
1688          * init the header of the remain lu_idxpages.
1689          */
1690         if (rc > 0)
1691                 dt_index_page_adjust(rdpg->rp_pages, rdpg->rp_npages,
1692                                      ii->ii_count);
1693
1694         dt_read_unlock(env, nodemap_idx);
1695         return rc;
1696 }
1697 EXPORT_SYMBOL(nodemap_index_read);
1698
1699 /**
1700  * Returns the current nodemap configuration to MGC by walking the nodemap
1701  * config index and storing it in the response buffer.
1702  *
1703  * \param       req             incoming MGS_CONFIG_READ request
1704  * \retval      0               success
1705  * \retval      -EINVAL         malformed request
1706  * \retval      -ENOTCONN       client evicted/reconnected already
1707  * \retval      -ETIMEDOUT      client timeout or network error
1708  * \retval      -ENOMEM
1709  */
1710 int nodemap_get_config_req(struct obd_device *mgs_obd,
1711                            struct ptlrpc_request *req)
1712 {
1713         const struct ptlrpc_bulk_frag_ops *frag_ops = &ptlrpc_bulk_kiov_pin_ops;
1714         struct mgs_config_body *body;
1715         struct mgs_config_res *res;
1716         struct lu_rdpg rdpg;
1717         struct idx_info nodemap_ii;
1718         struct ptlrpc_bulk_desc *desc;
1719         struct tg_export_data *rqexp_ted = &req->rq_export->exp_target_data;
1720         int i;
1721         int page_count;
1722         int bytes = 0;
1723         int rc = 0;
1724
1725         body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
1726         if (!body)
1727                 RETURN(-EINVAL);
1728
1729         if (body->mcb_type != MGS_CFG_T_NODEMAP)
1730                 RETURN(-EINVAL);
1731
1732         rdpg.rp_count = (body->mcb_units << body->mcb_bits);
1733         rdpg.rp_npages = (rdpg.rp_count + PAGE_SIZE - 1) >>
1734                 PAGE_SHIFT;
1735         if (rdpg.rp_npages > PTLRPC_MAX_BRW_PAGES)
1736                 RETURN(-EINVAL);
1737
1738         CDEBUG(D_INFO, "reading nodemap log, name '%s', size = %u\n",
1739                body->mcb_name, rdpg.rp_count);
1740
1741         /* allocate pages to store the containers */
1742         OBD_ALLOC_PTR_ARRAY(rdpg.rp_pages, rdpg.rp_npages);
1743         if (rdpg.rp_pages == NULL)
1744                 RETURN(-ENOMEM);
1745         for (i = 0; i < rdpg.rp_npages; i++) {
1746                 rdpg.rp_pages[i] = alloc_page(GFP_NOFS);
1747                 if (rdpg.rp_pages[i] == NULL)
1748                         GOTO(out, rc = -ENOMEM);
1749         }
1750
1751         rdpg.rp_hash = body->mcb_offset;
1752         nodemap_ii.ii_magic = IDX_INFO_MAGIC;
1753         nodemap_ii.ii_flags = II_FL_NOHASH;
1754         nodemap_ii.ii_version = rqexp_ted->ted_nodemap_version;
1755         nodemap_ii.ii_attrs = body->mcb_nm_cur_pass;
1756         nodemap_ii.ii_count = 0;
1757
1758         bytes = nodemap_index_read(req->rq_svc_thread->t_env,
1759                                    obd2obt(mgs_obd)->obt_nodemap_config_file,
1760                                    &nodemap_ii, &rdpg);
1761         if (bytes < 0)
1762                 GOTO(out, rc = bytes);
1763
1764         rqexp_ted->ted_nodemap_version = nodemap_ii.ii_version;
1765
1766         res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
1767         if (res == NULL)
1768                 GOTO(out, rc = -EINVAL);
1769         res->mcr_offset = nodemap_ii.ii_hash_end;
1770         res->mcr_nm_cur_pass = nodemap_ii.ii_attrs;
1771
1772         page_count = (bytes + PAGE_SIZE - 1) >> PAGE_SHIFT;
1773         LASSERT(page_count <= rdpg.rp_count);
1774         desc = ptlrpc_prep_bulk_exp(req, page_count, 1,
1775                                     PTLRPC_BULK_PUT_SOURCE,
1776                                     MGS_BULK_PORTAL, frag_ops);
1777         if (desc == NULL)
1778                 GOTO(out, rc = -ENOMEM);
1779
1780         for (i = 0; i < page_count && bytes > 0; i++) {
1781                 frag_ops->add_kiov_frag(desc, rdpg.rp_pages[i], 0,
1782                                         min_t(int, bytes, PAGE_SIZE));
1783                 bytes -= PAGE_SIZE;
1784         }
1785
1786         rc = target_bulk_io(req->rq_export, desc);
1787         ptlrpc_free_bulk(desc);
1788
1789 out:
1790         if (rdpg.rp_pages != NULL) {
1791                 for (i = 0; i < rdpg.rp_npages; i++)
1792                         if (rdpg.rp_pages[i] != NULL)
1793                                 __free_page(rdpg.rp_pages[i]);
1794                 OBD_FREE_PTR_ARRAY(rdpg.rp_pages, rdpg.rp_npages);
1795         }
1796         return rc;
1797 }
1798 EXPORT_SYMBOL(nodemap_get_config_req);