Whamcloud - gitweb
LU-3339 mdt: HSM on disk actions record
[fs/lustre-release.git] / lustre / mdt / mdt_hsm_cdt_actions.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * (C) Copyright 2012 Commissariat a l'energie atomique et aux energies
24  *     alternatives
25  *
26  */
27 /*
28  * lustre/mdt/mdt_agent_actions.c
29  *
30  * Lustre HSM
31  *
32  * Author: Jacques-Charles Lafoucriere <jacques-charles.lafoucriere@cea.fr>
33  * Author: Aurelien Degremont <aurelien.degremont@cea.fr>
34  */
35
36 #define DEBUG_SUBSYSTEM S_MDS
37
38 #include <obd_support.h>
39 #include <lustre_net.h>
40 #include <lustre_export.h>
41 #include <obd.h>
42 #include <lprocfs_status.h>
43 #include <lustre_log.h>
44 #include "mdt_internal.h"
45
46 void dump_llog_agent_req_rec(char *prefix, struct llog_agent_req_rec *larr)
47 {
48         char    buf[12];
49         int     sz;
50
51         sz = larr->arr_hai.hai_len - sizeof(larr->arr_hai);
52         CDEBUG(D_HSM, "%slrh=[type=%X len=%d idx=%d] fid="DFID
53                " dfid="DFID
54                " compound/cookie="LPX64"/"LPX64
55                " status=%s action=%s archive#=%d flags="LPX64
56                " create="LPU64" change="LPU64
57                " extent="LPX64"-"LPX64" gid="LPX64" datalen=%d"
58                " data=[%s]\n",
59                prefix,
60                larr->arr_hdr.lrh_type,
61                larr->arr_hdr.lrh_len, larr->arr_hdr.lrh_index,
62                PFID(&larr->arr_hai.hai_fid),
63                PFID(&larr->arr_hai.hai_dfid),
64                larr->arr_compound_id, larr->arr_hai.hai_cookie,
65                agent_req_status2name(larr->arr_status),
66                hsm_copytool_action2name(larr->arr_hai.hai_action),
67                larr->arr_archive_id,
68                larr->arr_flags,
69                larr->arr_req_create, larr->arr_req_change,
70                larr->arr_hai.hai_extent.offset,
71                larr->arr_hai.hai_extent.length,
72                larr->arr_hai.hai_gid, sz,
73                hai_dump_data_field(&larr->arr_hai, buf, sizeof(buf)));
74 }
75
76 /*
77  * process the actions llog
78  * \param env [IN] environment
79  * \param mdt [IN] MDT device
80  * \param cb [IN] llog callback funtion
81  * \param data [IN] llog callback  data
82  * \retval 0 success
83  * \retval -ve failure
84  */
85 int cdt_llog_process(const struct lu_env *env, struct mdt_device *mdt,
86                      llog_cb_t cb, void *data)
87 {
88         struct obd_device       *obd = mdt2obd_dev(mdt);
89         struct llog_ctxt        *lctxt = NULL;
90         struct coordinator      *cdt = &mdt->mdt_coordinator;
91         int                      rc;
92         ENTRY;
93
94         lctxt = llog_get_context(obd, LLOG_AGENT_ORIG_CTXT);
95         if ((lctxt == NULL) || (lctxt->loc_handle == NULL))
96                 RETURN(-ENOENT);
97
98         down(&cdt->cdt_llog_lock);
99
100         rc = llog_cat_process(env, lctxt->loc_handle, cb, data, 0, 0);
101         if (rc < 0)
102                 CERROR("%s: failed to process HSM_ACTIONS llog (rc=%d)\n",
103                         mdt_obd_name(mdt), rc);
104         else
105                 rc = 0;
106
107         llog_ctxt_put(lctxt);
108         up(&cdt->cdt_llog_lock);
109         RETURN(rc);
110 }
111
112 /**
113  * add an entry in agent llog
114  * \param env [IN] environment
115  * \param mdt [IN] PDT device
116  * \param compound_id [IN] global id associated with the record
117  * \param archive_id [IN] backend archive number
118  * \param hai [IN] record to register
119  * \retval 0 success
120  * \retval -ve failure
121  */
122 int mdt_agent_record_add(const struct lu_env *env,
123                          struct mdt_device *mdt,
124                          __u64 compound_id, __u32 archive_id,
125                          __u64 flags, struct hsm_action_item *hai)
126 {
127         struct obd_device               *obd = mdt2obd_dev(mdt);
128         struct coordinator              *cdt = &mdt->mdt_coordinator;
129         struct llog_ctxt                *lctxt = NULL;
130         struct llog_agent_req_rec       *larr;
131         int                              rc;
132         int                              sz;
133         ENTRY;
134
135         sz = llog_data_len(sizeof(*larr) + hai->hai_len - sizeof(*hai));
136         OBD_ALLOC(larr, sz);
137         if (!larr)
138                 RETURN(-ENOMEM);
139         larr->arr_hdr.lrh_len = sz;
140         larr->arr_hdr.lrh_type = HSM_AGENT_REC;
141         larr->arr_status = ARS_WAITING;
142         larr->arr_compound_id = compound_id;
143         larr->arr_archive_id = archive_id;
144         larr->arr_flags = flags;
145         larr->arr_req_create = cfs_time_current_sec();
146         larr->arr_req_change = larr->arr_req_create;
147         memcpy(&larr->arr_hai, hai, hai->hai_len);
148
149         lctxt = llog_get_context(obd, LLOG_AGENT_ORIG_CTXT);
150         if ((lctxt == NULL) || (lctxt->loc_handle == NULL))
151                 GOTO(free, rc = -ENOENT);
152
153         down(&cdt->cdt_llog_lock);
154
155         /* in case of cancel request, the cookie is already set to the
156          * value of the request cookie to be cancelled
157          * so we do not change it */
158         if (hai->hai_action != HSMA_CANCEL) {
159                 cdt->cdt_last_cookie++;
160                 hai->hai_cookie = cdt->cdt_last_cookie;
161         }
162         larr->arr_hai.hai_cookie = hai->hai_cookie;
163         rc = llog_cat_add(env, lctxt->loc_handle, &larr->arr_hdr, NULL, NULL);
164         if (rc > 0)
165                 rc = 0;
166
167         up(&cdt->cdt_llog_lock);
168         llog_ctxt_put(lctxt);
169
170         EXIT;
171 free:
172         OBD_FREE(larr, sz);
173         return rc;
174 }
175
176 /**
177  * data passed to llog_cat_process() callback
178  * to find requests
179  */
180 struct data_update_cb {
181         struct mdt_device       *mdt;
182         __u64                   *cookies;
183         int                      cookies_count;
184         int                      cookies_done;
185         enum agent_req_status    status;
186         cfs_time_t               change_time;
187 };
188
189 /**
190  *  llog_cat_process() callback, used to update a record
191  * \param env [IN] environment
192  * \param llh [IN] llog handle
193  * \param hdr [IN] llog record
194  * \param data [IN] cb data = data_update_cb
195  * \retval 0 success
196  * \retval -ve failure
197  */
198 static int mdt_agent_record_update_cb(const struct lu_env *env,
199                                       struct llog_handle *llh,
200                                       struct llog_rec_hdr *hdr,
201                                       void *data)
202 {
203         struct llog_agent_req_rec       *larr;
204         struct data_update_cb           *ducb;
205         int                              rc, i;
206         int                              found;
207         ENTRY;
208
209         larr = (struct llog_agent_req_rec *)hdr;
210         ducb = data;
211         found = 0;
212
213         /* check if all done */
214         if (ducb->cookies_count == ducb->cookies_done)
215                 RETURN(LLOG_PROC_BREAK);
216
217         /* if record is in final state, never change */
218         /* if record is a cancel request, it cannot be canceled
219          * this is to manage the following case:
220          * when a request is canceled, we have 2 records with the
221          * the same cookie : the one to cancel and the cancel request
222          * the 1st has to be set to ARS_CANCELED and the 2nd to ARS_SUCCEED
223          */
224         if (agent_req_in_final_state(larr->arr_status) ||
225             ((larr->arr_hai.hai_action == HSMA_CANCEL) &&
226              (ducb->status == ARS_CANCELED)))
227                 RETURN(0);
228
229         rc = 0;
230         for (i = 0 ; i < ducb->cookies_count ; i++) {
231                 CDEBUG(D_HSM, "%s: search "LPX64", found "LPX64"\n",
232                        mdt_obd_name(ducb->mdt), ducb->cookies[i],
233                        larr->arr_hai.hai_cookie);
234                 if (larr->arr_hai.hai_cookie == ducb->cookies[i]) {
235
236                         larr->arr_status = ducb->status;
237                         larr->arr_req_change = ducb->change_time;
238                         rc = mdt_agent_llog_update_rec(env, ducb->mdt, llh,
239                                                        larr);
240                         ducb->cookies_done++;
241                         found = 1;
242                         break;
243                 }
244         }
245
246         if (rc < 0)
247                 CERROR("%s: mdt_agent_llog_update_rec() failed, rc = %d\n",
248                        mdt_obd_name(ducb->mdt), rc);
249
250         if (found == 1)
251                 RETURN(LLOG_DEL_RECORD);
252
253         RETURN(rc);
254 }
255
256 /**
257  * update an entry in agent llog
258  * \param env [IN] environment
259  * \param mdt [IN] MDT device
260  * \param cookie [IN] entries to update
261  *    log cookie are returned by register
262  * \param status [IN] new status of the request
263  * \retval 0 success
264  * \retval -ve failure
265  */
266 int mdt_agent_record_update(const struct lu_env *env, struct mdt_device *mdt,
267                             __u64 *cookies, int cookies_count,
268                             enum agent_req_status status)
269 {
270         struct data_update_cb    ducb;
271         int                      rc;
272         ENTRY;
273
274         ducb.mdt = mdt;
275         ducb.cookies = cookies;
276         ducb.cookies_count = cookies_count;
277         ducb.cookies_done = 0;
278         ducb.status = status;
279         ducb.change_time = cfs_time_current_sec();
280
281         rc = cdt_llog_process(env, mdt, mdt_agent_record_update_cb, &ducb);
282         if (rc < 0)
283                 CERROR("%s: cdt_llog_process() failed, rc=%d, cannot update "
284                        "status to %s for %d cookies, done %d\n",
285                        mdt_obd_name(mdt), rc,
286                        agent_req_status2name(status),
287                        cookies_count, ducb.cookies_done);
288         RETURN(rc);
289 }
290
291 /**
292  * update a llog record
293  *  cdt_llog_lock must be hold
294  * \param env [IN] environment
295  * \param mdt [IN] mdt device
296  * \param llh [IN] llog handle, must be a catalog handle
297  * \param larr [IN] record
298  * \retval 0 success
299  * \retval -ve failure
300  */
301 int mdt_agent_llog_update_rec(const struct lu_env *env,
302                               struct mdt_device *mdt, struct llog_handle *llh,
303                               struct llog_agent_req_rec *larr)
304 {
305         struct llog_rec_hdr      saved_hdr;
306         int                      rc;
307         ENTRY;
308
309         /* saved old record info */
310         saved_hdr = larr->arr_hdr;
311         /* add new record with updated values */
312         larr->arr_hdr.lrh_id = 0;
313         larr->arr_hdr.lrh_index = 0;
314         rc = llog_cat_add(env, llh->u.phd.phd_cat_handle, &larr->arr_hdr,
315                           NULL, NULL);
316         larr->arr_hdr = saved_hdr;
317         RETURN(rc);
318 }
319
320 /*
321  * Agent actions /proc seq_file methods
322  * As llog processing uses a callback for each entry, we cannot do a sequential
323  * read. To limit calls to llog_cat_process (it spawns a thread), we fill
324  * multiple record in seq_file buffer in one show call.
325  * op->start() sets the iterator up and returns the first element of sequence
326  * op->stop() shuts it down.
327  * op->next() returns the next element of sequence.
328  * op->show() prints element into the buffer.
329  * In case of error ->start() and ->next() return ERR_PTR(error)
330  * In the end of sequence they return %NULL
331  * op->show() returns 0 in case of success and negative number in case of error.
332  *
333  */
334 /**
335  * seq_file iterator for agent_action entry
336  */
337 #define AGENT_ACTIONS_IT_MAGIC 0x19660426
338 struct agent_action_iterator {
339         int                      aai_magic;      /**< magic number */
340         struct lu_env            aai_env;        /**< lustre env for llog */
341         struct obd_device       *aai_obd;        /**< metadata device */
342         struct llog_ctxt        *aai_ctxt;       /**< llog context */
343         struct llog_handle      *aai_llh;        /**< llog handle */
344         int                      aai_index_done; /**< idx already shown */
345         int                      aai_index_cb;   /**< current idx in loop cb */
346         int                      aai_eof;        /**< all done */
347 };
348
349 /**
350  * seq_file method called to start access to /proc file
351  * get llog context + llog handle
352  */
353 static void *mdt_agent_actions_proc_start(struct seq_file *s, loff_t *pos)
354 {
355         struct agent_action_iterator    *aai = s->private;
356         int                              rc;
357         ENTRY;
358
359         LASSERTF(aai->aai_magic == AGENT_ACTIONS_IT_MAGIC, "%08X",
360                  aai->aai_magic);
361
362         aai->aai_ctxt = llog_get_context(aai->aai_obd, LLOG_AGENT_ORIG_CTXT);
363         if (aai->aai_ctxt == NULL) {
364                 CERROR("llog_get_context() failed\n");
365                 RETURN(ERR_PTR(-ENOENT));
366         }
367         rc = llog_open(&aai->aai_env, aai->aai_ctxt, &aai->aai_llh, NULL,
368                        HSM_ACTIONS, LLOG_OPEN_EXISTS);
369         if (rc)
370                 GOTO(err, rc);
371
372         rc = llog_init_handle(&aai->aai_env, aai->aai_llh, LLOG_F_IS_CAT, NULL);
373         if (rc)
374                 GOTO(err, rc);
375
376         CDEBUG(D_HSM, "llog succesfully initialized, start from "LPD64"\n",
377                *pos);
378         /* first call = rewind */
379         if (*pos == 0) {
380                 aai->aai_index_done = 0;
381                 aai->aai_eof = 0;
382                 *pos = 1;
383         }
384
385         RETURN(aai);
386 err:
387         if (aai->aai_llh) {
388                 llog_cat_close(&aai->aai_env, aai->aai_llh);
389                 aai->aai_llh = NULL;
390         }
391
392         if (aai->aai_ctxt)
393                 llog_ctxt_put(aai->aai_ctxt);
394
395         RETURN(ERR_PTR(rc));
396 }
397
398 /**
399  * seq_file method called to get next item
400  * just returns NULL at eof
401  * (seq_file buffer filling is done in llog_cat_process() callback)
402  */
403 static void *mdt_agent_actions_proc_next(struct seq_file *s, void *v,
404                                          loff_t *pos)
405 {
406         struct agent_action_iterator *aai = s->private;
407         ENTRY;
408
409         LASSERTF(aai->aai_magic == AGENT_ACTIONS_IT_MAGIC, "%08X",
410                  aai->aai_magic);
411
412         CDEBUG(D_HSM, "set current="LPD64" to done=%d, eof=%d\n",
413                *pos, aai->aai_index_done, aai->aai_eof);
414         (*pos) = aai->aai_index_done;
415
416         if (aai->aai_eof)
417                 RETURN(NULL);
418
419         RETURN(aai);
420 }
421
422 /**
423  *  llog_cat_process() callback, used to fill a seq_file buffer
424  */
425 static int agent_actions_show_cb(const struct lu_env *env,
426                                  struct llog_handle *llh,
427                                  struct llog_rec_hdr *hdr,
428                                  void *data)
429 {
430         struct llog_agent_req_rec    *larr = (struct llog_agent_req_rec *)hdr;
431         struct seq_file              *s = data;
432         struct agent_action_iterator *aai;
433         int                           rc, sz;
434         char                          buf[12];
435         ENTRY;
436
437         aai = s->private;
438         LASSERTF(aai->aai_magic == AGENT_ACTIONS_IT_MAGIC, "%08X",
439                  aai->aai_magic);
440
441         aai->aai_index_cb++;
442         /* if rec already printed => skip */
443         if (aai->aai_index_cb <= aai->aai_index_done)
444                 RETURN(0);
445
446         sz = larr->arr_hai.hai_len - sizeof(larr->arr_hai);
447         rc = seq_printf(s, "lrh=[type=%X len=%d idx=%d] fid="DFID
448                         " dfid="DFID
449                         " compound/cookie="LPX64"/"LPX64
450                         " action=%s archive#=%d flags="LPX64
451                         " extent="LPX64"-"LPX64
452                         " gid="LPX64" datalen=%d status=%s"
453                         " data=[%s]\n",
454                         larr->arr_hdr.lrh_type,
455                         larr->arr_hdr.lrh_len, larr->arr_hdr.lrh_index,
456                         PFID(&larr->arr_hai.hai_fid),
457                         PFID(&larr->arr_hai.hai_dfid),
458                         larr->arr_compound_id, larr->arr_hai.hai_cookie,
459                         hsm_copytool_action2name(larr->arr_hai.hai_action),
460                         larr->arr_archive_id,
461                         larr->arr_flags,
462                         larr->arr_hai.hai_extent.offset,
463                         larr->arr_hai.hai_extent.length,
464                         larr->arr_hai.hai_gid, sz,
465                         agent_req_status2name(larr->arr_status),
466                         hai_dump_data_field(&larr->arr_hai, buf, sizeof(buf)));
467         if (rc >= 0) {
468                 aai->aai_index_done++;
469                 RETURN(0);
470         }
471         /* buffer is full, stop filling */
472         RETURN(LLOG_PROC_BREAK);
473 }
474
475 /**
476  * mdt_agent_actions_proc_show() is called at for each seq record
477  * process the llog, with a cb which fill the file_seq buffer
478  * to be faster, one show will fill multiple records
479  */
480 static int mdt_agent_actions_proc_show(struct seq_file *s, void *v)
481 {
482         struct agent_action_iterator    *aai = s->private;
483         int                              rc;
484         ENTRY;
485
486         LASSERTF(aai->aai_magic == AGENT_ACTIONS_IT_MAGIC, "%08X",
487                  aai->aai_magic);
488
489         CDEBUG(D_HSM, "show from done=%d, eof=%d\n",
490                aai->aai_index_done, aai->aai_eof);
491         if (aai->aai_eof)
492                 RETURN(0);
493
494         aai->aai_index_cb = 0;
495         rc = llog_cat_process(&aai->aai_env, aai->aai_llh,
496                               agent_actions_show_cb, s, 0, 0);
497         /* was all llog parsed? */
498         if (rc == 0)
499                 aai->aai_eof = 1;
500         /* not enough room in buffer? */
501         if (rc == LLOG_PROC_BREAK)
502                 RETURN(0);
503         /* error */
504         RETURN(rc);
505 }
506
507 /**
508  * seq_file method called to stop access to /proc file
509  * clean + put llog context
510  */
511 static void mdt_agent_actions_proc_stop(struct seq_file *s, void *v)
512 {
513         struct agent_action_iterator *aai = s->private;
514         ENTRY;
515
516         LASSERTF(aai->aai_magic == AGENT_ACTIONS_IT_MAGIC, "%08X",
517                  aai->aai_magic);
518
519         if (aai->aai_llh) {
520                 llog_cat_close(&aai->aai_env, aai->aai_llh);
521                 aai->aai_llh = NULL;
522         }
523         if (aai->aai_ctxt)
524                 llog_ctxt_put(aai->aai_ctxt);
525         EXIT;
526         return;
527 }
528
529 static const struct seq_operations mdt_agent_actions_proc_ops = {
530         .start  = mdt_agent_actions_proc_start,
531         .next   = mdt_agent_actions_proc_next,
532         .show   = mdt_agent_actions_proc_show,
533         .stop   = mdt_agent_actions_proc_stop,
534 };
535
536 static int lprocfs_open_agent_actions(struct inode *inode, struct file *file)
537 {
538         struct agent_action_iterator    *aai;
539         struct seq_file                 *s;
540         int                              rc;
541         struct mdt_device               *mdt;
542         ENTRY;
543
544         if (LPROCFS_ENTRY_AND_CHECK(PDE(inode)))
545                 RETURN(-ENOENT);
546
547         rc = seq_open(file, &mdt_agent_actions_proc_ops);
548         if (rc) {
549                 LPROCFS_EXIT();
550                 RETURN(rc);
551         }
552
553         OBD_ALLOC_PTR(aai);
554         if (aai == NULL)
555                 GOTO(err, rc = -ENOMEM);
556
557         aai->aai_magic = AGENT_ACTIONS_IT_MAGIC;
558         rc = lu_env_init(&aai->aai_env, LCT_LOCAL);
559         if (rc)
560                 GOTO(err, rc);
561
562         aai->aai_llh = NULL;
563         /* mdt is saved in proc_dir_entry->data by
564          * mdt_coordinator_procfs_init() calling lprocfs_register()
565          */
566         mdt = (struct mdt_device *)PDE(inode)->data;
567         aai->aai_obd = mdt2obd_dev(mdt);
568         s = file->private_data;
569         s->private = aai;
570
571         GOTO(out, rc = 0);
572
573 err:
574         lprocfs_seq_release(inode, file);
575         if (aai && aai->aai_env.le_ses)
576                 OBD_FREE_PTR(aai->aai_env.le_ses);
577         if (aai)
578                 OBD_FREE_PTR(aai);
579 out:
580         return rc;
581 }
582
583 /**
584  * lprocfs_release_agent_actions() is called at end of /proc access
585  * free alloacted ressources and call cleanup lprocfs methods
586  */
587 static int lprocfs_release_agent_actions(struct inode *inode, struct file *file)
588 {
589         struct seq_file                 *seq = file->private_data;
590         struct agent_action_iterator    *aai = seq->private;
591
592         if (aai) {
593                 lu_env_fini(&aai->aai_env);
594                 OBD_FREE_PTR(aai);
595         }
596
597         return lprocfs_seq_release(inode, file);
598 }
599
600 /* methods to access agent actions llog through /proc */
601 const struct file_operations mdt_agent_actions_fops = {
602         .owner          = THIS_MODULE,
603         .open           = lprocfs_open_agent_actions,
604         .read           = seq_read,
605         .llseek         = seq_lseek,
606         .release        = lprocfs_release_agent_actions,
607 };
608