Whamcloud - gitweb
LU-14051 utils: flush alr batch file in thread
[fs/lustre-release.git] / lustre / utils / ofd_access_batch.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.gnu.org/licenses/gpl-2.0.html
19  *
20  * GPL HEADER END
21  *
22  * Copyright 2020, DataDirect Networks Storage.
23  *
24  * This file is part of Lustre, http://www.lustre.org/
25  *
26  * Author: John L. Hammond <jhammond@whamcloud.com>
27  *
28  * lustre/utils/ofd_access_batch.c
29  *
30  * Access log entry batching for ofd_access_log_reader.
31  */
32 #include <stdlib.h>
33 #include <stdbool.h>
34 #include <stddef.h>
35 #include <assert.h>
36 #include <malloc.h>
37 #include <unistd.h>
38 #include <pthread.h>
39 #include <linux/lustre/lustre_access_log.h>
40 #include <linux/lustre/lustre_fid.h>
41 #include <linux/lustre/lustre_idl.h>
42 #include <libcfs/util/hash.h>
43 #include <libcfs/util/list.h>
44 #include "lstddef.h"
45 #include "ofd_access_batch.h"
46
47 /* XXX Weird param order to be consistent with list_replace_init(). */
48 static inline void list_replace_init(struct list_head *old_node,
49                                 struct list_head *new_node)
50 {
51         list_add(new_node, old_node);
52         list_del_init(old_node);
53 }
54
55 struct fid_hash_node {
56         struct list_head fhn_node;
57         struct lu_fid fhn_fid;
58 };
59
60 static inline bool fid_eq(const struct lu_fid *f1, const struct lu_fid *f2)
61 {
62         return f1->f_seq == f2->f_seq && f1->f_oid == f2->f_oid &&
63                f1->f_ver == f2->f_ver;
64 }
65
66 static inline __u64 fid_flatten(const struct lu_fid *fid)
67 {
68         __u64 ino;
69         __u64 seq;
70
71         if (fid_is_igif(fid)) {
72                 ino = lu_igif_ino(fid);
73                 return ino;
74         }
75
76         seq = fid_seq(fid);
77
78         ino = (seq << 24) + ((seq >> 24) & 0xffffff0000ULL) + fid_oid(fid);
79
80         return ino != 0 ? ino : fid_oid(fid);
81 }
82
83 /**
84  * map fid to 32 bit value for ino on 32bit systems.
85  */
86 static inline __u32 fid_flatten32(const struct lu_fid *fid)
87 {
88         __u32 ino;
89         __u64 seq;
90
91         if (fid_is_igif(fid)) {
92                 ino = lu_igif_ino(fid);
93                 return ino;
94         }
95
96         seq = fid_seq(fid) - FID_SEQ_START;
97
98         /* Map the high bits of the OID into higher bits of the inode number so
99          * that inodes generated at about the same time have a reduced chance
100          * of collisions. This will give a period of 2^12 = 1024 unique clients
101          * (from SEQ) and up to min(LUSTRE_SEQ_MAX_WIDTH, 2^20) = 128k objects
102          * (from OID), or up to 128M inodes without collisions for new files.
103          */
104         ino = ((seq & 0x000fffffULL) << 12) + ((seq >> 8) & 0xfffff000) +
105               (seq >> (64 - (40-8)) & 0xffffff00) +
106               (fid_oid(fid) & 0xff000fff) + ((fid_oid(fid) & 0x00fff000) << 8);
107
108         return ino != 0 ? ino : fid_oid(fid);
109 }
110
111 static unsigned long fid_hash(const struct lu_fid *f, unsigned int shift)
112 {
113 #if __BITS_PER_LONG == 32
114         return hash_long(fid_flatten32(f), shift);
115 #elif __BITS_PER_LONG == 64
116         return hash_long(fid_flatten(f), shift);
117 #else
118 # error "Wordsize not 32 or 64"
119 #endif
120 }
121
122 static void fhn_init(struct fid_hash_node *fhn, const struct lu_fid *fid)
123 {
124         INIT_LIST_HEAD(&fhn->fhn_node);
125         fhn->fhn_fid = *fid;
126 }
127
128 static bool fhn_is_hashed(struct fid_hash_node *fhn)
129 {
130         return !list_empty(&fhn->fhn_node);
131 }
132
133 static void fhn_del_init(struct fid_hash_node *fhn)
134 {
135         if (fhn_is_hashed(fhn))
136                 list_del_init(&fhn->fhn_node);
137 }
138
139 static inline void fhn_replace_init(struct fid_hash_node *old_fhn,
140                                 struct fid_hash_node *new_fhn)
141 {
142         list_add(&new_fhn->fhn_node, &old_fhn->fhn_node);
143         list_del_init(&old_fhn->fhn_node);
144 }
145
146 void fid_hash_add(struct list_head *head, unsigned int shift,
147                 struct fid_hash_node *fhn)
148 {
149         assert(!fhn_is_hashed(fhn));
150
151         list_add(&fhn->fhn_node, &head[fid_hash(&fhn->fhn_fid, shift)]);
152 }
153
154 struct fid_hash_node *
155 fid_hash_find(struct list_head *head, unsigned int shift, const struct lu_fid *fid)
156 {
157         struct list_head *hash_list;
158         struct fid_hash_node *fhn, *next;
159
160         hash_list = &head[fid_hash(fid, shift)];
161         list_for_each_entry_safe(fhn, next, hash_list, fhn_node) {
162                 assert(fhn_is_hashed(fhn));
163
164                 if (fid_eq(fid, &fhn->fhn_fid))
165                         return fhn;
166         }
167
168         return NULL;
169 }
170
171 struct fid_hash_node *
172 fid_hash_insert(struct list_head *head, unsigned int shift, struct fid_hash_node *new_fhn)
173 {
174         struct list_head *list;
175         struct fid_hash_node *old_fhn, *next;
176
177         list = &head[fid_hash(&new_fhn->fhn_fid, shift)];
178         list_for_each_entry_safe(old_fhn, next, list, fhn_node) {
179                 assert(fhn_is_hashed(old_fhn));
180
181                 if (fid_eq(&old_fhn->fhn_fid, &new_fhn->fhn_fid))
182                         return old_fhn;
183         }
184
185         list_add(&new_fhn->fhn_node, list);
186
187         return new_fhn;
188 }
189
190 int fid_hash_init(struct list_head **phead, unsigned int *pshift, unsigned int shift)
191 {
192         struct list_head *new_head;
193         unsigned int i;
194
195         new_head = malloc(sizeof(*new_head) << shift);
196         if (new_head == NULL)
197                 return -1;
198
199         for (i = 0; i < (1 << shift); i++)
200                 INIT_LIST_HEAD(&new_head[i]);
201
202         *phead = new_head;
203         *pshift = shift;
204
205         return 0;
206 }
207
208 int fid_hash_resize(struct list_head **phead, unsigned int *pshift, unsigned int new_shift)
209 {
210         struct list_head *new_head;
211         unsigned int i;
212         int rc;
213
214         if (*pshift == new_shift)
215                 return 0;
216
217         rc = fid_hash_init(&new_head, &new_shift, new_shift);
218         if (rc < 0)
219                 return rc;
220
221         for (i = 0; i < (1 << *pshift); i++) {
222                 struct list_head *list = &(*phead)[i];
223                 struct fid_hash_node *fhn, *next;
224
225                 list_for_each_entry_safe(fhn, next, list, fhn_node) {
226                         fhn_del_init(fhn);
227                         fid_hash_add(new_head, new_shift, fhn);
228                 }
229         }
230
231         free(*phead);
232         *phead = new_head;
233         *pshift = new_shift;
234
235         return 0;
236 }
237
238 enum {
239         ALR_READ = 0,
240         ALR_WRITE = 1,
241 };
242
243 /* Entry in the batching hash. */
244 struct alr_entry {
245         struct fid_hash_node alre_fid_hash_node;
246         time_t alre_time[2]; /* Not strictly needed. */
247         __u64 alre_begin[2];
248         __u64 alre_end[2];
249         __u64 alre_size[2];
250         __u64 alre_segment_count[2];
251         __u64 alre_count[2];
252         char alre_obd_name[];
253 };
254
255 enum {
256         ALR_BATCH_HASH_SHIFT_DEFAULT = 10,
257         ALR_BATCH_HASH_SHIFT_MAX = 30,
258 };
259
260 struct alr_batch {
261         struct list_head *alrb_hash;
262         unsigned int alrb_hash_shift;
263         unsigned int alrb_count;
264 };
265
266 static void alre_del_init(struct alr_entry *alre)
267 {
268         fhn_del_init(&alre->alre_fid_hash_node);
269 }
270
271 static void alre_update(struct alr_entry *alre, time_t time, __u64 begin,
272                         __u64 end, __u32 size, __u32 segment_count, __u32 flags)
273 {
274         unsigned int d = (flags & OFD_ACCESS_READ) ? ALR_READ : ALR_WRITE;
275
276         alre->alre_time[d] = max_t(time_t, alre->alre_time[d], time);
277         alre->alre_begin[d] = min_t(__u64, alre->alre_begin[d], begin);
278         alre->alre_end[d] = max_t(__u64, alre->alre_end[d], end);
279         alre->alre_size[d] += size;
280         alre->alre_segment_count[d] += segment_count;
281         alre->alre_count[d] += 1;
282 }
283
284 int alr_batch_add(struct alr_batch *alrb, const char *obd_name,
285                 const struct lu_fid *pfid, time_t time, __u64 begin, __u64 end,
286                 __u32 size, __u32 segment_count, __u32 flags)
287 {
288         struct fid_hash_node fhn, *p;
289         struct alr_entry *alre;
290         int rc;
291
292         if (alrb == NULL)
293                 return 0;
294
295         assert(sizeof(time_t) == sizeof(__u64));
296
297         fhn_init(&fhn, pfid);
298
299         /* Find old or insert sentinel (fhn). Replace sentinel if returned. */
300         p = fid_hash_insert(alrb->alrb_hash, alrb->alrb_hash_shift, &fhn);
301         if (p == &fhn) {
302                 size_t alre_size = sizeof(*alre) + strlen(obd_name) + 1;
303
304                 alre = calloc(1, alre_size);
305                 if (alre == NULL) {
306                         rc = -1;
307                         goto out;
308                 }
309
310                 fhn_init(&alre->alre_fid_hash_node, pfid);
311                 strcpy(alre->alre_obd_name, obd_name);
312                 fhn_replace_init(&fhn, &alre->alre_fid_hash_node);
313                 alrb->alrb_count++;
314         } else {
315                 alre = container_of(p, struct alr_entry, alre_fid_hash_node);
316         }
317
318         alre_update(alre, time, begin, end, size, segment_count, flags);
319         rc = 0;
320 out:
321         fhn_del_init(&fhn);
322
323         return rc;
324 }
325
326 int sort_compare(const void *a1, const void *a2)
327 {
328         int l = *(const int*)a1;
329         int r = *(const int *)a2;
330         if (l > r) return -1;
331         if (l < r) return  1;
332         return 0;
333 }
334
335 static void alre_printf(FILE *f, struct alr_entry *alre, int d)
336 {
337         fprintf(f, "o=%s f="DFID" t=%lld b=%llu e=%llu s=%llu g=%llu n=%llu d=%c\n",
338                 alre->alre_obd_name,
339                 PFID(&alre->alre_fid_hash_node.fhn_fid),
340                 (long long)alre->alre_time[d],
341                 (unsigned long long)alre->alre_begin[d],
342                 (unsigned long long)alre->alre_end[d],
343                 (unsigned long long)alre->alre_size[d],
344                 (unsigned long long)alre->alre_segment_count[d],
345                 (unsigned long long)alre->alre_count[d],
346                 (d == ALR_READ) ? 'r' : 'w');
347 }
348
349 struct alr_thread_arg {
350         struct list_head list;
351         int fraction;
352         FILE *file;
353         pthread_mutex_t *file_mutex;
354 };
355
356 void *alr_sort_and_print_thread(void *arg)
357 {
358         struct alr_entry *alre, *next;
359         struct alr_thread_arg *aa = arg;
360         struct list_head *tmp = &aa->list;
361         int *sa = NULL;
362         int rc, d, i, nr = 0;
363         unsigned long cut;
364
365         list_for_each_entry(alre, tmp, alre_fid_hash_node.fhn_node) {
366                 if (alre->alre_count[0] > 0)
367                         nr++;
368                 if (alre->alre_count[1] > 0)
369                         nr++;
370         }
371
372         if (nr == 0)
373                 goto out;
374
375         sa = calloc(nr, sizeof(*sa));
376         if (!sa) {
377                 fprintf(stderr, "cannot allocate memory for sorting\n");
378                 exit(1);
379         }
380
381         i = 0;
382         list_for_each_entry(alre, tmp, alre_fid_hash_node.fhn_node) {
383                 if (alre->alre_count[0] > 0)
384                         sa[i++] = alre->alre_count[0];
385                 if (alre->alre_count[1] > 0)
386                         sa[i++] = alre->alre_count[1];
387         }
388
389         qsort(sa, nr, sizeof(*sa), sort_compare);
390         i = nr * aa->fraction / 100;
391         cut = sa[i];
392         if (cut < 1)
393                 cut = 1;
394         free(sa);
395
396         /* Prevent jumbled output from multiple concurrent sort and
397          * print threads. */
398         rc = pthread_mutex_lock(aa->file_mutex);
399         if (rc != 0) {
400                 fprintf(stderr, "cannot lock batch file: %s\n",
401                         strerror(rc));
402                 exit(1);
403         }
404
405         /* there might be lots of items at @cut, but we want to limit total
406          * output. so the first loop dumps all items > @cut and the second
407          * loop dumps items=@cut so that total number (@i) is not exceeeded.
408          * XXX: possible optimization - move items=@cut to another list, so
409          * that 2nd pass takes < O(n) */
410         list_for_each_entry(alre, tmp, alre_fid_hash_node.fhn_node) {
411                 for (d = 0; d < 2; d++) {
412                         if (alre->alre_count[d] <= cut)
413                                 continue;
414                         alre_printf(aa->file, alre, d);
415                         i--;
416                 }
417         }
418
419         list_for_each_entry(alre, tmp, alre_fid_hash_node.fhn_node) {
420                 for (d = 0; d < 2 && i > 0; d++) {
421                         if (alre->alre_count[d] != cut)
422                                 continue;
423                         alre_printf(aa->file, alre, d);
424                         i--;
425                 }
426         }
427
428         rc = pthread_mutex_unlock(aa->file_mutex);
429         if (rc != 0) {
430                 fprintf(stderr, "cannot unlock batch file: %s\n",
431                         strerror(rc));
432                 exit(1);
433         }
434
435 out:
436         fflush(aa->file);
437
438         list_for_each_entry_safe(alre, next, tmp, alre_fid_hash_node.fhn_node) {
439                 alre_del_init(alre);
440                 free(alre);
441         }
442
443         free(aa);
444
445         return NULL;
446 }
447
448 /* Print, clear, and resize the batch. */
449 int alr_batch_print(struct alr_batch *alrb, FILE *file,
450                     pthread_mutex_t *file_mutex, int fraction)
451 {
452         unsigned int new_hash_shift;
453         pthread_attr_t attr, *pattr = NULL;
454         struct alr_thread_arg *aa = NULL;
455         pthread_t pid;
456         int i, rc;
457
458         if (alrb == NULL)
459                 return 0;
460
461         aa = calloc(1, sizeof(*aa));
462         if (aa == NULL)
463                 return -ENOMEM;
464
465         /* move all collected items to the temp list */
466         INIT_LIST_HEAD(&aa->list);
467         for (i = 0; i < (1 << alrb->alrb_hash_shift); i++) {
468                 if (list_empty(&alrb->alrb_hash[i]))
469                         continue;
470                 list_splice(&alrb->alrb_hash[i], &aa->list);
471                 INIT_LIST_HEAD(&alrb->alrb_hash[i]);
472         }
473         aa->file = file;
474         aa->file_mutex = file_mutex;
475         aa->fraction = fraction;
476
477         rc = pthread_attr_init(&attr);
478         if (rc != 0)
479                 goto out;
480
481         pattr = &attr;
482
483         rc = pthread_attr_setdetachstate(pattr, PTHREAD_CREATE_DETACHED);
484         if (rc != 0)
485                 goto out;
486
487         /* as sorting may take time and we don't want to lose access
488          * records we better do sorting and printing in a different thread */
489         rc = pthread_create(&pid, pattr, &alr_sort_and_print_thread, aa);
490         if (rc != 0)
491                 goto out;
492
493         aa = NULL; /* Sort and print thread owns it now. */
494 out:
495         /* Resize hash based on previous count. */
496         new_hash_shift = alrb->alrb_hash_shift;
497
498         while (new_hash_shift < ALR_BATCH_HASH_SHIFT_MAX &&
499                (1 << new_hash_shift) < alrb->alrb_count)
500                 new_hash_shift++;
501
502         fid_hash_resize(&alrb->alrb_hash, &alrb->alrb_hash_shift,
503                         new_hash_shift);
504
505         alrb->alrb_count = 0;
506
507         if (pattr != NULL)
508                 pthread_attr_destroy(pattr);
509
510         if (aa != NULL) {
511                 struct alr_entry *alre, *next;
512
513                 list_for_each_entry_safe(alre, next, &aa->list,
514                                          alre_fid_hash_node.fhn_node) {
515                         alre_del_init(alre);
516                         free(alre);
517                 }
518         }
519
520         free(aa);
521
522         if (rc > 0)
523                 rc = -rc; /* Fixup pthread return conventions. */
524
525         return rc;
526 }
527
528 struct alr_batch *alr_batch_create(unsigned int shift)
529 {
530         struct alr_batch *alrb;
531         int rc;
532
533         if (shift == -1U)
534                 shift = ALR_BATCH_HASH_SHIFT_DEFAULT;
535
536         alrb = calloc(1, sizeof(*alrb));
537         if (alrb == NULL)
538                 return NULL;
539
540         rc = fid_hash_init(&alrb->alrb_hash, &alrb->alrb_hash_shift, shift);
541         if (rc < 0) {
542                 free(alrb);
543                 return NULL;
544         }
545
546         return alrb;
547 }
548
549 void alr_batch_destroy(struct alr_batch *alrb)
550 {
551         unsigned int i;
552
553         if (alrb == NULL)
554                 return;
555
556         for (i = 0; i < (1 << alrb->alrb_hash_shift); i++) {
557                 struct list_head *list = &alrb->alrb_hash[i];
558                 struct alr_entry *alre, *next;
559
560                 list_for_each_entry_safe(alre, next, list, alre_fid_hash_node.fhn_node) {
561                         alre_del_init(alre);
562                         free(alre);
563                 }
564         }
565
566         free(alrb->alrb_hash);
567         free(alrb);
568 }