--- /dev/null
+/*
+ * GPL HEADER START
+ *
+ * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License version 2 only,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License version 2 for more details. A copy is
+ * included in the COPYING file that accompanied this code.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * GPL HEADER END
+ */
+/*
+ * (C) Copyright 2012 Commissariat a l'energie atomique et aux energies
+ * alternatives
+ *
+ */
+/*
+ * lustre/mdt/mdt_agent_actions.c
+ *
+ * Lustre HSM
+ *
+ * Author: Jacques-Charles Lafoucriere <jacques-charles.lafoucriere@cea.fr>
+ * Author: Aurelien Degremont <aurelien.degremont@cea.fr>
+ */
+
+#define DEBUG_SUBSYSTEM S_MDS
+
+#include <obd_support.h>
+#include <lustre_net.h>
+#include <lustre_export.h>
+#include <obd.h>
+#include <lprocfs_status.h>
+#include <lustre_log.h>
+#include "mdt_internal.h"
+
+void dump_llog_agent_req_rec(char *prefix, struct llog_agent_req_rec *larr)
+{
+ char buf[12];
+ int sz;
+
+ sz = larr->arr_hai.hai_len - sizeof(larr->arr_hai);
+ CDEBUG(D_HSM, "%slrh=[type=%X len=%d idx=%d] fid="DFID
+ " dfid="DFID
+ " compound/cookie="LPX64"/"LPX64
+ " status=%s action=%s archive#=%d flags="LPX64
+ " create="LPU64" change="LPU64
+ " extent="LPX64"-"LPX64" gid="LPX64" datalen=%d"
+ " data=[%s]\n",
+ prefix,
+ larr->arr_hdr.lrh_type,
+ larr->arr_hdr.lrh_len, larr->arr_hdr.lrh_index,
+ PFID(&larr->arr_hai.hai_fid),
+ PFID(&larr->arr_hai.hai_dfid),
+ larr->arr_compound_id, larr->arr_hai.hai_cookie,
+ agent_req_status2name(larr->arr_status),
+ hsm_copytool_action2name(larr->arr_hai.hai_action),
+ larr->arr_archive_id,
+ larr->arr_flags,
+ larr->arr_req_create, larr->arr_req_change,
+ larr->arr_hai.hai_extent.offset,
+ larr->arr_hai.hai_extent.length,
+ larr->arr_hai.hai_gid, sz,
+ hai_dump_data_field(&larr->arr_hai, buf, sizeof(buf)));
+}
+
+/*
+ * process the actions llog
+ * \param env [IN] environment
+ * \param mdt [IN] MDT device
+ * \param cb [IN] llog callback funtion
+ * \param data [IN] llog callback data
+ * \retval 0 success
+ * \retval -ve failure
+ */
+int cdt_llog_process(const struct lu_env *env, struct mdt_device *mdt,
+ llog_cb_t cb, void *data)
+{
+ struct obd_device *obd = mdt2obd_dev(mdt);
+ struct llog_ctxt *lctxt = NULL;
+ struct coordinator *cdt = &mdt->mdt_coordinator;
+ int rc;
+ ENTRY;
+
+ lctxt = llog_get_context(obd, LLOG_AGENT_ORIG_CTXT);
+ if ((lctxt == NULL) || (lctxt->loc_handle == NULL))
+ RETURN(-ENOENT);
+
+ down(&cdt->cdt_llog_lock);
+
+ rc = llog_cat_process(env, lctxt->loc_handle, cb, data, 0, 0);
+ if (rc < 0)
+ CERROR("%s: failed to process HSM_ACTIONS llog (rc=%d)\n",
+ mdt_obd_name(mdt), rc);
+ else
+ rc = 0;
+
+ llog_ctxt_put(lctxt);
+ up(&cdt->cdt_llog_lock);
+ RETURN(rc);
+}
+
+/**
+ * add an entry in agent llog
+ * \param env [IN] environment
+ * \param mdt [IN] PDT device
+ * \param compound_id [IN] global id associated with the record
+ * \param archive_id [IN] backend archive number
+ * \param hai [IN] record to register
+ * \retval 0 success
+ * \retval -ve failure
+ */
+int mdt_agent_record_add(const struct lu_env *env,
+ struct mdt_device *mdt,
+ __u64 compound_id, __u32 archive_id,
+ __u64 flags, struct hsm_action_item *hai)
+{
+ struct obd_device *obd = mdt2obd_dev(mdt);
+ struct coordinator *cdt = &mdt->mdt_coordinator;
+ struct llog_ctxt *lctxt = NULL;
+ struct llog_agent_req_rec *larr;
+ int rc;
+ int sz;
+ ENTRY;
+
+ sz = llog_data_len(sizeof(*larr) + hai->hai_len - sizeof(*hai));
+ OBD_ALLOC(larr, sz);
+ if (!larr)
+ RETURN(-ENOMEM);
+ larr->arr_hdr.lrh_len = sz;
+ larr->arr_hdr.lrh_type = HSM_AGENT_REC;
+ larr->arr_status = ARS_WAITING;
+ larr->arr_compound_id = compound_id;
+ larr->arr_archive_id = archive_id;
+ larr->arr_flags = flags;
+ larr->arr_req_create = cfs_time_current_sec();
+ larr->arr_req_change = larr->arr_req_create;
+ memcpy(&larr->arr_hai, hai, hai->hai_len);
+
+ lctxt = llog_get_context(obd, LLOG_AGENT_ORIG_CTXT);
+ if ((lctxt == NULL) || (lctxt->loc_handle == NULL))
+ GOTO(free, rc = -ENOENT);
+
+ down(&cdt->cdt_llog_lock);
+
+ /* in case of cancel request, the cookie is already set to the
+ * value of the request cookie to be cancelled
+ * so we do not change it */
+ if (hai->hai_action != HSMA_CANCEL) {
+ cdt->cdt_last_cookie++;
+ hai->hai_cookie = cdt->cdt_last_cookie;
+ }
+ larr->arr_hai.hai_cookie = hai->hai_cookie;
+ rc = llog_cat_add(env, lctxt->loc_handle, &larr->arr_hdr, NULL, NULL);
+ if (rc > 0)
+ rc = 0;
+
+ up(&cdt->cdt_llog_lock);
+ llog_ctxt_put(lctxt);
+
+ EXIT;
+free:
+ OBD_FREE(larr, sz);
+ return rc;
+}
+
+/**
+ * data passed to llog_cat_process() callback
+ * to find requests
+ */
+struct data_update_cb {
+ struct mdt_device *mdt;
+ __u64 *cookies;
+ int cookies_count;
+ int cookies_done;
+ enum agent_req_status status;
+ cfs_time_t change_time;
+};
+
+/**
+ * llog_cat_process() callback, used to update a record
+ * \param env [IN] environment
+ * \param llh [IN] llog handle
+ * \param hdr [IN] llog record
+ * \param data [IN] cb data = data_update_cb
+ * \retval 0 success
+ * \retval -ve failure
+ */
+static int mdt_agent_record_update_cb(const struct lu_env *env,
+ struct llog_handle *llh,
+ struct llog_rec_hdr *hdr,
+ void *data)
+{
+ struct llog_agent_req_rec *larr;
+ struct data_update_cb *ducb;
+ int rc, i;
+ int found;
+ ENTRY;
+
+ larr = (struct llog_agent_req_rec *)hdr;
+ ducb = data;
+ found = 0;
+
+ /* check if all done */
+ if (ducb->cookies_count == ducb->cookies_done)
+ RETURN(LLOG_PROC_BREAK);
+
+ /* if record is in final state, never change */
+ /* if record is a cancel request, it cannot be canceled
+ * this is to manage the following case:
+ * when a request is canceled, we have 2 records with the
+ * the same cookie : the one to cancel and the cancel request
+ * the 1st has to be set to ARS_CANCELED and the 2nd to ARS_SUCCEED
+ */
+ if (agent_req_in_final_state(larr->arr_status) ||
+ ((larr->arr_hai.hai_action == HSMA_CANCEL) &&
+ (ducb->status == ARS_CANCELED)))
+ RETURN(0);
+
+ rc = 0;
+ for (i = 0 ; i < ducb->cookies_count ; i++) {
+ CDEBUG(D_HSM, "%s: search "LPX64", found "LPX64"\n",
+ mdt_obd_name(ducb->mdt), ducb->cookies[i],
+ larr->arr_hai.hai_cookie);
+ if (larr->arr_hai.hai_cookie == ducb->cookies[i]) {
+
+ larr->arr_status = ducb->status;
+ larr->arr_req_change = ducb->change_time;
+ rc = mdt_agent_llog_update_rec(env, ducb->mdt, llh,
+ larr);
+ ducb->cookies_done++;
+ found = 1;
+ break;
+ }
+ }
+
+ if (rc < 0)
+ CERROR("%s: mdt_agent_llog_update_rec() failed, rc = %d\n",
+ mdt_obd_name(ducb->mdt), rc);
+
+ if (found == 1)
+ RETURN(LLOG_DEL_RECORD);
+
+ RETURN(rc);
+}
+
+/**
+ * update an entry in agent llog
+ * \param env [IN] environment
+ * \param mdt [IN] MDT device
+ * \param cookie [IN] entries to update
+ * log cookie are returned by register
+ * \param status [IN] new status of the request
+ * \retval 0 success
+ * \retval -ve failure
+ */
+int mdt_agent_record_update(const struct lu_env *env, struct mdt_device *mdt,
+ __u64 *cookies, int cookies_count,
+ enum agent_req_status status)
+{
+ struct data_update_cb ducb;
+ int rc;
+ ENTRY;
+
+ ducb.mdt = mdt;
+ ducb.cookies = cookies;
+ ducb.cookies_count = cookies_count;
+ ducb.cookies_done = 0;
+ ducb.status = status;
+ ducb.change_time = cfs_time_current_sec();
+
+ rc = cdt_llog_process(env, mdt, mdt_agent_record_update_cb, &ducb);
+ if (rc < 0)
+ CERROR("%s: cdt_llog_process() failed, rc=%d, cannot update "
+ "status to %s for %d cookies, done %d\n",
+ mdt_obd_name(mdt), rc,
+ agent_req_status2name(status),
+ cookies_count, ducb.cookies_done);
+ RETURN(rc);
+}
+
+/**
+ * update a llog record
+ * cdt_llog_lock must be hold
+ * \param env [IN] environment
+ * \param mdt [IN] mdt device
+ * \param llh [IN] llog handle, must be a catalog handle
+ * \param larr [IN] record
+ * \retval 0 success
+ * \retval -ve failure
+ */
+int mdt_agent_llog_update_rec(const struct lu_env *env,
+ struct mdt_device *mdt, struct llog_handle *llh,
+ struct llog_agent_req_rec *larr)
+{
+ struct llog_rec_hdr saved_hdr;
+ int rc;
+ ENTRY;
+
+ /* saved old record info */
+ saved_hdr = larr->arr_hdr;
+ /* add new record with updated values */
+ larr->arr_hdr.lrh_id = 0;
+ larr->arr_hdr.lrh_index = 0;
+ rc = llog_cat_add(env, llh->u.phd.phd_cat_handle, &larr->arr_hdr,
+ NULL, NULL);
+ larr->arr_hdr = saved_hdr;
+ RETURN(rc);
+}
+
+/*
+ * Agent actions /proc seq_file methods
+ * As llog processing uses a callback for each entry, we cannot do a sequential
+ * read. To limit calls to llog_cat_process (it spawns a thread), we fill
+ * multiple record in seq_file buffer in one show call.
+ * op->start() sets the iterator up and returns the first element of sequence
+ * op->stop() shuts it down.
+ * op->next() returns the next element of sequence.
+ * op->show() prints element into the buffer.
+ * In case of error ->start() and ->next() return ERR_PTR(error)
+ * In the end of sequence they return %NULL
+ * op->show() returns 0 in case of success and negative number in case of error.
+ *
+ */
+/**
+ * seq_file iterator for agent_action entry
+ */
+#define AGENT_ACTIONS_IT_MAGIC 0x19660426
+struct agent_action_iterator {
+ int aai_magic; /**< magic number */
+ struct lu_env aai_env; /**< lustre env for llog */
+ struct obd_device *aai_obd; /**< metadata device */
+ struct llog_ctxt *aai_ctxt; /**< llog context */
+ struct llog_handle *aai_llh; /**< llog handle */
+ int aai_index_done; /**< idx already shown */
+ int aai_index_cb; /**< current idx in loop cb */
+ int aai_eof; /**< all done */
+};
+
+/**
+ * seq_file method called to start access to /proc file
+ * get llog context + llog handle
+ */
+static void *mdt_agent_actions_proc_start(struct seq_file *s, loff_t *pos)
+{
+ struct agent_action_iterator *aai = s->private;
+ int rc;
+ ENTRY;
+
+ LASSERTF(aai->aai_magic == AGENT_ACTIONS_IT_MAGIC, "%08X",
+ aai->aai_magic);
+
+ aai->aai_ctxt = llog_get_context(aai->aai_obd, LLOG_AGENT_ORIG_CTXT);
+ if (aai->aai_ctxt == NULL) {
+ CERROR("llog_get_context() failed\n");
+ RETURN(ERR_PTR(-ENOENT));
+ }
+ rc = llog_open(&aai->aai_env, aai->aai_ctxt, &aai->aai_llh, NULL,
+ HSM_ACTIONS, LLOG_OPEN_EXISTS);
+ if (rc)
+ GOTO(err, rc);
+
+ rc = llog_init_handle(&aai->aai_env, aai->aai_llh, LLOG_F_IS_CAT, NULL);
+ if (rc)
+ GOTO(err, rc);
+
+ CDEBUG(D_HSM, "llog succesfully initialized, start from "LPD64"\n",
+ *pos);
+ /* first call = rewind */
+ if (*pos == 0) {
+ aai->aai_index_done = 0;
+ aai->aai_eof = 0;
+ *pos = 1;
+ }
+
+ RETURN(aai);
+err:
+ if (aai->aai_llh) {
+ llog_cat_close(&aai->aai_env, aai->aai_llh);
+ aai->aai_llh = NULL;
+ }
+
+ if (aai->aai_ctxt)
+ llog_ctxt_put(aai->aai_ctxt);
+
+ RETURN(ERR_PTR(rc));
+}
+
+/**
+ * seq_file method called to get next item
+ * just returns NULL at eof
+ * (seq_file buffer filling is done in llog_cat_process() callback)
+ */
+static void *mdt_agent_actions_proc_next(struct seq_file *s, void *v,
+ loff_t *pos)
+{
+ struct agent_action_iterator *aai = s->private;
+ ENTRY;
+
+ LASSERTF(aai->aai_magic == AGENT_ACTIONS_IT_MAGIC, "%08X",
+ aai->aai_magic);
+
+ CDEBUG(D_HSM, "set current="LPD64" to done=%d, eof=%d\n",
+ *pos, aai->aai_index_done, aai->aai_eof);
+ (*pos) = aai->aai_index_done;
+
+ if (aai->aai_eof)
+ RETURN(NULL);
+
+ RETURN(aai);
+}
+
+/**
+ * llog_cat_process() callback, used to fill a seq_file buffer
+ */
+static int agent_actions_show_cb(const struct lu_env *env,
+ struct llog_handle *llh,
+ struct llog_rec_hdr *hdr,
+ void *data)
+{
+ struct llog_agent_req_rec *larr = (struct llog_agent_req_rec *)hdr;
+ struct seq_file *s = data;
+ struct agent_action_iterator *aai;
+ int rc, sz;
+ char buf[12];
+ ENTRY;
+
+ aai = s->private;
+ LASSERTF(aai->aai_magic == AGENT_ACTIONS_IT_MAGIC, "%08X",
+ aai->aai_magic);
+
+ aai->aai_index_cb++;
+ /* if rec already printed => skip */
+ if (aai->aai_index_cb <= aai->aai_index_done)
+ RETURN(0);
+
+ sz = larr->arr_hai.hai_len - sizeof(larr->arr_hai);
+ rc = seq_printf(s, "lrh=[type=%X len=%d idx=%d] fid="DFID
+ " dfid="DFID
+ " compound/cookie="LPX64"/"LPX64
+ " action=%s archive#=%d flags="LPX64
+ " extent="LPX64"-"LPX64
+ " gid="LPX64" datalen=%d status=%s"
+ " data=[%s]\n",
+ larr->arr_hdr.lrh_type,
+ larr->arr_hdr.lrh_len, larr->arr_hdr.lrh_index,
+ PFID(&larr->arr_hai.hai_fid),
+ PFID(&larr->arr_hai.hai_dfid),
+ larr->arr_compound_id, larr->arr_hai.hai_cookie,
+ hsm_copytool_action2name(larr->arr_hai.hai_action),
+ larr->arr_archive_id,
+ larr->arr_flags,
+ larr->arr_hai.hai_extent.offset,
+ larr->arr_hai.hai_extent.length,
+ larr->arr_hai.hai_gid, sz,
+ agent_req_status2name(larr->arr_status),
+ hai_dump_data_field(&larr->arr_hai, buf, sizeof(buf)));
+ if (rc >= 0) {
+ aai->aai_index_done++;
+ RETURN(0);
+ }
+ /* buffer is full, stop filling */
+ RETURN(LLOG_PROC_BREAK);
+}
+
+/**
+ * mdt_agent_actions_proc_show() is called at for each seq record
+ * process the llog, with a cb which fill the file_seq buffer
+ * to be faster, one show will fill multiple records
+ */
+static int mdt_agent_actions_proc_show(struct seq_file *s, void *v)
+{
+ struct agent_action_iterator *aai = s->private;
+ int rc;
+ ENTRY;
+
+ LASSERTF(aai->aai_magic == AGENT_ACTIONS_IT_MAGIC, "%08X",
+ aai->aai_magic);
+
+ CDEBUG(D_HSM, "show from done=%d, eof=%d\n",
+ aai->aai_index_done, aai->aai_eof);
+ if (aai->aai_eof)
+ RETURN(0);
+
+ aai->aai_index_cb = 0;
+ rc = llog_cat_process(&aai->aai_env, aai->aai_llh,
+ agent_actions_show_cb, s, 0, 0);
+ /* was all llog parsed? */
+ if (rc == 0)
+ aai->aai_eof = 1;
+ /* not enough room in buffer? */
+ if (rc == LLOG_PROC_BREAK)
+ RETURN(0);
+ /* error */
+ RETURN(rc);
+}
+
+/**
+ * seq_file method called to stop access to /proc file
+ * clean + put llog context
+ */
+static void mdt_agent_actions_proc_stop(struct seq_file *s, void *v)
+{
+ struct agent_action_iterator *aai = s->private;
+ ENTRY;
+
+ LASSERTF(aai->aai_magic == AGENT_ACTIONS_IT_MAGIC, "%08X",
+ aai->aai_magic);
+
+ if (aai->aai_llh) {
+ llog_cat_close(&aai->aai_env, aai->aai_llh);
+ aai->aai_llh = NULL;
+ }
+ if (aai->aai_ctxt)
+ llog_ctxt_put(aai->aai_ctxt);
+ EXIT;
+ return;
+}
+
+static const struct seq_operations mdt_agent_actions_proc_ops = {
+ .start = mdt_agent_actions_proc_start,
+ .next = mdt_agent_actions_proc_next,
+ .show = mdt_agent_actions_proc_show,
+ .stop = mdt_agent_actions_proc_stop,
+};
+
+static int lprocfs_open_agent_actions(struct inode *inode, struct file *file)
+{
+ struct agent_action_iterator *aai;
+ struct seq_file *s;
+ int rc;
+ struct mdt_device *mdt;
+ ENTRY;
+
+ if (LPROCFS_ENTRY_AND_CHECK(PDE(inode)))
+ RETURN(-ENOENT);
+
+ rc = seq_open(file, &mdt_agent_actions_proc_ops);
+ if (rc) {
+ LPROCFS_EXIT();
+ RETURN(rc);
+ }
+
+ OBD_ALLOC_PTR(aai);
+ if (aai == NULL)
+ GOTO(err, rc = -ENOMEM);
+
+ aai->aai_magic = AGENT_ACTIONS_IT_MAGIC;
+ rc = lu_env_init(&aai->aai_env, LCT_LOCAL);
+ if (rc)
+ GOTO(err, rc);
+
+ aai->aai_llh = NULL;
+ /* mdt is saved in proc_dir_entry->data by
+ * mdt_coordinator_procfs_init() calling lprocfs_register()
+ */
+ mdt = (struct mdt_device *)PDE(inode)->data;
+ aai->aai_obd = mdt2obd_dev(mdt);
+ s = file->private_data;
+ s->private = aai;
+
+ GOTO(out, rc = 0);
+
+err:
+ lprocfs_seq_release(inode, file);
+ if (aai && aai->aai_env.le_ses)
+ OBD_FREE_PTR(aai->aai_env.le_ses);
+ if (aai)
+ OBD_FREE_PTR(aai);
+out:
+ return rc;
+}
+
+/**
+ * lprocfs_release_agent_actions() is called at end of /proc access
+ * free alloacted ressources and call cleanup lprocfs methods
+ */
+static int lprocfs_release_agent_actions(struct inode *inode, struct file *file)
+{
+ struct seq_file *seq = file->private_data;
+ struct agent_action_iterator *aai = seq->private;
+
+ if (aai) {
+ lu_env_fini(&aai->aai_env);
+ OBD_FREE_PTR(aai);
+ }
+
+ return lprocfs_seq_release(inode, file);
+}
+
+/* methods to access agent actions llog through /proc */
+const struct file_operations mdt_agent_actions_fops = {
+ .owner = THIS_MODULE,
+ .open = lprocfs_open_agent_actions,
+ .read = seq_read,
+ .llseek = seq_lseek,
+ .release = lprocfs_release_agent_actions,
+};
+