Whamcloud - gitweb
1617eeb24626062437b94477a607a9d6783bcea6
[fs/lustre-release.git] / lustre / mdt / mdt_coordinator.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2012, 2013, Intel Corporation.
24  * Use is subject to license terms.
25  * Copyright (c) 2011, 2012 Commissariat a l'energie atomique et aux energies
26  *                          alternatives
27  */
28 /*
29  * lustre/mdt/mdt_coordinator.c
30  *
31  * Lustre HSM Coordinator
32  *
33  * Author: Jacques-Charles Lafoucriere <jacques-charles.lafoucriere@cea.fr>
34  * Author: Aurelien Degremont <aurelien.degremont@cea.fr>
35  * Author: Thomas Leibovici <thomas.leibovici@cea.fr>
36  */
37
38 #define DEBUG_SUBSYSTEM S_MDS
39
40 #include <obd_support.h>
41 #include <lustre_net.h>
42 #include <lustre_export.h>
43 #include <obd.h>
44 #include <lprocfs_status.h>
45 #include <lustre_log.h>
46 #include "mdt_internal.h"
47
48 static struct lprocfs_seq_vars lprocfs_mdt_hsm_vars[];
49
50 /**
51  * get obj and HSM attributes on a fid
52  * \param mti [IN] context
53  * \param fid [IN] object fid
54  * \param hsm [OUT] HSM meta data
55  * \retval obj or error (-ENOENT if not found)
56  */
57 struct mdt_object *mdt_hsm_get_md_hsm(struct mdt_thread_info *mti,
58                                       const struct lu_fid *fid,
59                                       struct md_hsm *hsm)
60 {
61         struct md_attr          *ma;
62         struct mdt_object       *obj;
63         int                      rc;
64         ENTRY;
65
66         ma = &mti->mti_attr;
67         ma->ma_need = MA_HSM;
68         ma->ma_valid = 0;
69
70         /* find object by FID */
71         obj = mdt_object_find(mti->mti_env, mti->mti_mdt, fid);
72         if (IS_ERR(obj))
73                 RETURN(obj);
74
75         if (!mdt_object_exists(obj)) {
76                 /* no more object */
77                 mdt_object_put(mti->mti_env, obj);
78                 RETURN(ERR_PTR(-ENOENT));
79         }
80
81         rc = mdt_attr_get_complex(mti, obj, ma);
82         if (rc) {
83                 mdt_object_put(mti->mti_env, obj);
84                 RETURN(ERR_PTR(rc));
85         }
86
87         if (ma->ma_valid & MA_HSM)
88                 *hsm = ma->ma_hsm;
89         else
90                 memset(hsm, 0, sizeof(*hsm));
91         ma->ma_valid = 0;
92         RETURN(obj);
93 }
94
95 void mdt_hsm_dump_hal(int level, const char *prefix,
96                       struct hsm_action_list *hal)
97 {
98         int                      i, sz;
99         struct hsm_action_item  *hai;
100         char                     buf[12];
101
102         CDEBUG(level, "%s: HAL header: version %X count %d compound "LPX64
103                       " archive_id %d flags "LPX64"\n",
104                prefix, hal->hal_version, hal->hal_count,
105                hal->hal_compound_id, hal->hal_archive_id, hal->hal_flags);
106
107         hai = hai_first(hal);
108         for (i = 0; i < hal->hal_count; i++) {
109                 sz = hai->hai_len - sizeof(*hai);
110                 CDEBUG(level, "%s %d: fid="DFID" dfid="DFID
111                        " compound/cookie="LPX64"/"LPX64
112                        " action=%s extent="LPX64"-"LPX64" gid="LPX64
113                        " datalen=%d data=[%s]\n",
114                        prefix, i,
115                        PFID(&hai->hai_fid), PFID(&hai->hai_dfid),
116                        hal->hal_compound_id, hai->hai_cookie,
117                        hsm_copytool_action2name(hai->hai_action),
118                        hai->hai_extent.offset,
119                        hai->hai_extent.length,
120                        hai->hai_gid, sz,
121                        hai_dump_data_field(hai, buf, sizeof(buf)));
122                 hai = hai_next(hai);
123         }
124 }
125
126 /**
127  * data passed to llog_cat_process() callback
128  * to scan requests and take actions
129  */
130 struct hsm_scan_data {
131         struct mdt_thread_info          *mti;
132         char                             fs_name[MTI_NAME_MAXLEN+1];
133         /* request to be send to agents */
134         int                              request_sz;    /** allocated size */
135         int                              max_requests;  /** vector size */
136         int                              request_cnt;   /** used count */
137         struct {
138                 int                      hal_sz;
139                 int                      hal_used_sz;
140                 struct hsm_action_list  *hal;
141         } *request;
142         /* records to be canceled */
143         int                              max_cookie;    /** vector size */
144         int                              cookie_cnt;    /** used count */
145         __u64                           *cookies;
146 };
147
148 /**
149  *  llog_cat_process() callback, used to:
150  *  - find waiting request and start action
151  *  - purge canceled and done requests
152  * \param env [IN] environment
153  * \param llh [IN] llog handle
154  * \param hdr [IN] llog record
155  * \param data [IN/OUT] cb data = struct hsm_scan_data
156  * \retval 0 success
157  * \retval -ve failure
158  */
159 static int mdt_coordinator_cb(const struct lu_env *env,
160                               struct llog_handle *llh,
161                               struct llog_rec_hdr *hdr,
162                               void *data)
163 {
164         const struct llog_agent_req_rec *larr;
165         struct hsm_scan_data            *hsd;
166         struct hsm_action_item          *hai;
167         struct mdt_device               *mdt;
168         struct coordinator              *cdt;
169         int                              rc;
170         ENTRY;
171
172         hsd = data;
173         mdt = hsd->mti->mti_mdt;
174         cdt = &mdt->mdt_coordinator;
175
176         larr = (struct llog_agent_req_rec *)hdr;
177         dump_llog_agent_req_rec("mdt_coordinator_cb(): ", larr);
178         switch (larr->arr_status) {
179         case ARS_WAITING: {
180                 int i, empty_slot, found;
181
182                 /* Are agents full? */
183                 if (atomic_read(&cdt->cdt_request_count) ==
184                     cdt->cdt_max_requests)
185                         break;
186
187                 /* first search if the request if known in the list we have
188                  * build and if there is room in the request vector */
189                 empty_slot = -1;
190                 found = -1;
191                 for (i = 0; i < hsd->max_requests &&
192                             (empty_slot == -1 || found == -1); i++) {
193                         if (hsd->request[i].hal == NULL) {
194                                 empty_slot = i;
195                                 continue;
196                         }
197                         if (hsd->request[i].hal->hal_compound_id ==
198                                 larr->arr_compound_id) {
199                                 found = i;
200                                 continue;
201                         }
202                 }
203                 if (found == -1 && empty_slot == -1)
204                         /* unknown request and no more room for new request,
205                          * continue scan for to find other entries for
206                          * already found request
207                          */
208                         RETURN(0);
209
210                 if (found == -1) {
211                         struct hsm_action_list *hal;
212
213                         /* request is not already known */
214                         /* allocates hai vector size just needs to be large
215                          * enough */
216                         hsd->request[empty_slot].hal_sz =
217                                      sizeof(*hsd->request[empty_slot].hal) +
218                                      cfs_size_round(MTI_NAME_MAXLEN+1) +
219                                      2 * cfs_size_round(larr->arr_hai.hai_len);
220                         OBD_ALLOC(hal, hsd->request[empty_slot].hal_sz);
221                         if (!hal) {
222                                 CERROR("%s: Cannot allocate memory (%d o)"
223                                        "for compound "LPX64"\n",
224                                        mdt_obd_name(mdt),
225                                        hsd->request[i].hal_sz,
226                                        larr->arr_compound_id);
227                                 RETURN(-ENOMEM);
228                         }
229                         hal->hal_version = HAL_VERSION;
230                         strncpy(hal->hal_fsname, hsd->fs_name,
231                                 MTI_NAME_MAXLEN);
232                         hal->hal_fsname[MTI_NAME_MAXLEN] = '\0';
233                         hal->hal_compound_id = larr->arr_compound_id;
234                         hal->hal_archive_id = larr->arr_archive_id;
235                         hal->hal_flags = larr->arr_flags;
236                         hal->hal_count = 0;
237                         hsd->request[empty_slot].hal_used_sz = hal_size(hal);
238                         hsd->request[empty_slot].hal = hal;
239                         hsd->request_cnt++;
240                         found = empty_slot;
241                         hai = hai_first(hal);
242                 } else {
243                         /* request is known */
244                         /* we check if record archive num is the same as the
245                          * known request, if not we will serve it in multiple
246                          * time because we do not know if the agent can serve
247                          * multiple backend
248                          * a use case is a compound made of multiple restore
249                          * where the files are not archived in the same backend
250                          */
251                         if (larr->arr_archive_id !=
252                             hsd->request[found].hal->hal_archive_id)
253                                 RETURN(0);
254
255                         if (hsd->request[found].hal_sz <
256                             hsd->request[found].hal_used_sz +
257                              cfs_size_round(larr->arr_hai.hai_len)) {
258                                 /* Not enough room, need an extension */
259                                 void *hal_buffer;
260                                 int sz;
261
262                                 sz = 2 * hsd->request[found].hal_sz;
263                                 OBD_ALLOC(hal_buffer, sz);
264                                 if (!hal_buffer) {
265                                         CERROR("%s: Cannot allocate memory "
266                                                "(%d o) for compound "LPX64"\n",
267                                                mdt_obd_name(mdt), sz,
268                                                larr->arr_compound_id);
269                                         RETURN(-ENOMEM);
270                                 }
271                                 memcpy(hal_buffer, hsd->request[found].hal,
272                                        hsd->request[found].hal_used_sz);
273                                 OBD_FREE(hsd->request[found].hal,
274                                          hsd->request[found].hal_sz);
275                                 hsd->request[found].hal = hal_buffer;
276                                 hsd->request[found].hal_sz = sz;
277                         }
278                         hai = hai_first(hsd->request[found].hal);
279                         for (i = 0; i < hsd->request[found].hal->hal_count;
280                              i++)
281                                 hai = hai_next(hai);
282                 }
283                 memcpy(hai, &larr->arr_hai, larr->arr_hai.hai_len);
284                 hai->hai_cookie = larr->arr_hai.hai_cookie;
285                 hai->hai_gid = larr->arr_hai.hai_gid;
286
287                 hsd->request[found].hal_used_sz +=
288                                                    cfs_size_round(hai->hai_len);
289                 hsd->request[found].hal->hal_count++;
290                 break;
291         }
292         case ARS_STARTED: {
293                 struct cdt_agent_req *car;
294                 cfs_time_t last;
295
296                 /* we search for a running request
297                  * error may happen if coordinator crashes or stopped
298                  * with running request
299                  */
300                 car = mdt_cdt_find_request(cdt, larr->arr_hai.hai_cookie, NULL);
301                 if (car == NULL) {
302                         last = larr->arr_req_create;
303                 } else {
304                         last = car->car_req_update;
305                         mdt_cdt_put_request(car);
306                 }
307
308                 /* test if request too long, if yes cancel it
309                  * the same way the copy tool acknowledge a cancel request */
310                 if ((last + cdt->cdt_active_req_timeout)
311                      < cfs_time_current_sec()) {
312                         struct hsm_progress_kernel pgs;
313
314                         dump_llog_agent_req_rec("mdt_coordinator_cb(): "
315                                                 "request timeouted, start "
316                                                 "cleaning", larr);
317                         /* a too old cancel request just needs to be removed
318                          * this can happen, if copy tool does not support cancel
319                          * for other requests, we have to remove the running
320                          * request and notify the copytool
321                          */
322                         pgs.hpk_fid = larr->arr_hai.hai_fid;
323                         pgs.hpk_cookie = larr->arr_hai.hai_cookie;
324                         pgs.hpk_extent = larr->arr_hai.hai_extent;
325                         pgs.hpk_flags = HP_FLAG_COMPLETED;
326                         pgs.hpk_errval = ENOSYS;
327                         pgs.hpk_data_version = 0;
328                         /* update request state, but do not record in llog, to
329                          * avoid deadlock on cdt_llog_lock
330                          */
331                         rc = mdt_hsm_update_request_state(hsd->mti, &pgs, 0);
332                         if (rc)
333                                 CERROR("%s: Cannot cleanup timeouted request: "
334                                        DFID" for cookie "LPX64" action=%s\n",
335                                        mdt_obd_name(mdt),
336                                        PFID(&pgs.hpk_fid), pgs.hpk_cookie,
337                                        hsm_copytool_action2name(
338                                                      larr->arr_hai.hai_action));
339
340                         /* add the cookie to the list of record to be
341                          * canceled by caller */
342                         if (hsd->max_cookie == (hsd->cookie_cnt - 1)) {
343                                 __u64 *ptr, *old_ptr;
344                                 int old_sz, new_sz, new_cnt;
345
346                                 /* need to increase vector size */
347                                 old_sz = sizeof(__u64) * hsd->max_cookie;
348                                 old_ptr = hsd->cookies;
349
350                                 new_cnt = 2 * hsd->max_cookie;
351                                 new_sz = sizeof(__u64) * new_cnt;
352
353                                 OBD_ALLOC(ptr, new_sz);
354                                 if (!ptr) {
355                                         CERROR("%s: Cannot allocate memory "
356                                                "(%d o) for cookie vector\n",
357                                                mdt_obd_name(mdt), new_sz);
358                                         RETURN(-ENOMEM);
359                                 }
360                                 memcpy(ptr, hsd->cookies, old_sz);
361                                 hsd->cookies = ptr;
362                                 hsd->max_cookie = new_cnt;
363                                 OBD_FREE(old_ptr, old_sz);
364                         }
365                         hsd->cookies[hsd->cookie_cnt] =
366                                                        larr->arr_hai.hai_cookie;
367                         hsd->cookie_cnt++;
368                 }
369                 break;
370         }
371         case ARS_FAILED:
372         case ARS_CANCELED:
373         case ARS_SUCCEED:
374                 if ((larr->arr_req_change + cdt->cdt_grace_delay) <
375                     cfs_time_current_sec())
376                         RETURN(LLOG_DEL_RECORD);
377                 break;
378         }
379         RETURN(0);
380 }
381
382 /**
383  * create /proc entries for coordinator
384  * \param mdt [IN]
385  * \retval 0 success
386  * \retval -ve failure
387  */
388 int hsm_cdt_procfs_init(struct mdt_device *mdt)
389 {
390         struct coordinator      *cdt = &mdt->mdt_coordinator;
391         int                      rc = 0;
392         ENTRY;
393
394         /* init /proc entries, failure is not critical */
395         cdt->cdt_proc_dir = lprocfs_seq_register("hsm",
396                                              mdt2obd_dev(mdt)->obd_proc_entry,
397                                              lprocfs_mdt_hsm_vars, mdt);
398         if (IS_ERR(cdt->cdt_proc_dir)) {
399                 rc = PTR_ERR(cdt->cdt_proc_dir);
400                 CERROR("%s: Cannot create 'hsm' directory in mdt proc dir,"
401                        " rc=%d\n", mdt_obd_name(mdt), rc);
402                 cdt->cdt_proc_dir = NULL;
403                 RETURN(rc);
404         }
405
406         RETURN(0);
407 }
408
409 /**
410  * remove /proc entries for coordinator
411  * \param mdt [IN]
412  */
413 void  hsm_cdt_procfs_fini(struct mdt_device *mdt)
414 {
415         struct coordinator      *cdt = &mdt->mdt_coordinator;
416
417         LASSERT(cdt->cdt_state == CDT_STOPPED);
418         if (cdt->cdt_proc_dir != NULL)
419                 lprocfs_remove(&cdt->cdt_proc_dir);
420 }
421
422 /**
423  * get vector of hsm cdt /proc vars
424  * \param none
425  * \retval var vector
426  */
427 struct lprocfs_seq_vars *hsm_cdt_get_proc_vars(void)
428 {
429         return lprocfs_mdt_hsm_vars;
430 }
431
432 /**
433  * coordinator thread
434  * \param data [IN] obd device
435  * \retval 0 success
436  * \retval -ve failure
437  */
438 static int mdt_coordinator(void *data)
439 {
440         struct mdt_thread_info  *mti = data;
441         struct mdt_device       *mdt = mti->mti_mdt;
442         struct coordinator      *cdt = &mdt->mdt_coordinator;
443         struct hsm_scan_data     hsd = { 0 };
444         int                      rc = 0;
445         ENTRY;
446
447         cdt->cdt_thread.t_flags = SVC_RUNNING;
448         wake_up(&cdt->cdt_thread.t_ctl_waitq);
449
450         CDEBUG(D_HSM, "%s: coordinator thread starting, pid=%d\n",
451                mdt_obd_name(mdt), current_pid());
452
453         /* timeouted cookie vector initialization */
454         hsd.max_cookie = 0;
455         hsd.cookie_cnt = 0;
456         hsd.cookies = NULL;
457         /* we use a copy of cdt_max_requests in the cb, so if cdt_max_requests
458          * increases due to a change from /proc we do not overflow the
459          * hsd.request[] vector
460          */
461         hsd.max_requests = cdt->cdt_max_requests;
462         hsd.request_sz = hsd.max_requests * sizeof(*hsd.request);
463         OBD_ALLOC(hsd.request, hsd.request_sz);
464         if (!hsd.request)
465                 GOTO(out, rc = -ENOMEM);
466
467         hsd.mti = mti;
468         obd_uuid2fsname(hsd.fs_name, mdt_obd_name(mdt), MTI_NAME_MAXLEN);
469
470         while (1) {
471                 struct l_wait_info lwi;
472                 int i;
473
474                 lwi = LWI_TIMEOUT(cfs_time_seconds(cdt->cdt_loop_period),
475                                   NULL, NULL);
476                 l_wait_event(cdt->cdt_thread.t_ctl_waitq,
477                              (cdt->cdt_thread.t_flags &
478                               (SVC_STOPPING|SVC_EVENT)),
479                              &lwi);
480
481                 CDEBUG(D_HSM, "coordinator resumes\n");
482
483                 if (cdt->cdt_thread.t_flags & SVC_STOPPING ||
484                     cdt->cdt_state == CDT_STOPPING) {
485                         cdt->cdt_thread.t_flags &= ~SVC_STOPPING;
486                         rc = 0;
487                         break;
488                 }
489
490                 /* wake up before timeout, new work arrives */
491                 if (cdt->cdt_thread.t_flags & SVC_EVENT)
492                         cdt->cdt_thread.t_flags &= ~SVC_EVENT;
493
494                 /* if coordinator is suspended continue to wait */
495                 if (cdt->cdt_state == CDT_DISABLE) {
496                         CDEBUG(D_HSM, "disable state, coordinator sleeps\n");
497                         continue;
498                 }
499
500                 CDEBUG(D_HSM, "coordinator starts reading llog\n");
501
502                 if (hsd.max_requests != cdt->cdt_max_requests) {
503                         /* cdt_max_requests has changed,
504                          * we need to allocate a new buffer
505                          */
506                         OBD_FREE(hsd.request, hsd.request_sz);
507                         hsd.max_requests = cdt->cdt_max_requests;
508                         hsd.request_sz =
509                                    hsd.max_requests * sizeof(*hsd.request);
510                         OBD_ALLOC(hsd.request, hsd.request_sz);
511                         if (!hsd.request) {
512                                 rc = -ENOMEM;
513                                 break;
514                         }
515                 }
516
517                 /* create canceled cookie vector for an arbitrary size
518                  * if needed, vector will grow during llog scan
519                  */
520                 hsd.max_cookie = 10;
521                 hsd.cookie_cnt = 0;
522                 OBD_ALLOC(hsd.cookies, hsd.max_cookie * sizeof(__u64));
523                 if (!hsd.cookies) {
524                         rc = -ENOMEM;
525                         goto clean_cb_alloc;
526                 }
527                 hsd.request_cnt = 0;
528
529                 rc = cdt_llog_process(mti->mti_env, mdt,
530                                       mdt_coordinator_cb, &hsd);
531                 if (rc < 0)
532                         goto clean_cb_alloc;
533
534                 CDEBUG(D_HSM, "Found %d requests to send and %d"
535                               " requests to cancel\n",
536                        hsd.request_cnt, hsd.cookie_cnt);
537                 /* first we cancel llog records of the timeouted requests */
538                 if (hsd.cookie_cnt > 0) {
539                         rc = mdt_agent_record_update(mti->mti_env, mdt,
540                                                      hsd.cookies,
541                                                      hsd.cookie_cnt,
542                                                      ARS_CANCELED);
543                         if (rc)
544                                 CERROR("%s: mdt_agent_record_update() failed, "
545                                        "rc=%d, cannot update status to %s "
546                                        "for %d cookies\n",
547                                        mdt_obd_name(mdt), rc,
548                                        agent_req_status2name(ARS_CANCELED),
549                                        hsd.cookie_cnt);
550                 }
551
552                 if (list_empty(&cdt->cdt_agents)) {
553                         CDEBUG(D_HSM, "no agent available, "
554                                       "coordinator sleeps\n");
555                         goto clean_cb_alloc;
556                 }
557
558                 /* here hsd contains a list of requests to be started */
559                 for (i = 0; i < hsd.max_requests; i++) {
560                         struct hsm_action_list  *hal;
561                         struct hsm_action_item  *hai;
562                         __u64                   *cookies;
563                         int                      sz, j;
564                         enum agent_req_status    status;
565
566                         /* still room for work ? */
567                         if (atomic_read(&cdt->cdt_request_count) ==
568                             cdt->cdt_max_requests)
569                                 break;
570
571                         if (hsd.request[i].hal == NULL)
572                                 continue;
573
574                         /* found a request, we start it */
575                         /* kuc payload allocation so we avoid an additionnal
576                          * allocation in mdt_hsm_agent_send()
577                          */
578                         hal = kuc_alloc(hsd.request[i].hal_used_sz,
579                                         KUC_TRANSPORT_HSM, HMT_ACTION_LIST);
580                         if (IS_ERR(hal)) {
581                                 CERROR("%s: Cannot allocate memory (%d o) "
582                                        "for compound "LPX64"\n",
583                                        mdt_obd_name(mdt),
584                                        hsd.request[i].hal_used_sz,
585                                        hsd.request[i].hal->hal_compound_id);
586                                 continue;
587                         }
588                         memcpy(hal, hsd.request[i].hal,
589                                hsd.request[i].hal_used_sz);
590
591                         rc = mdt_hsm_agent_send(mti, hal, 0);
592                         /* if failure, we suppose it is temporary
593                          * if the copy tool failed to do the request
594                          * it has to use hsm_progress
595                          */
596                         status = (rc ? ARS_WAITING : ARS_STARTED);
597
598                         /* set up cookie vector to set records status
599                          * after copy tools start or failed
600                          */
601                         sz = hsd.request[i].hal->hal_count * sizeof(__u64);
602                         OBD_ALLOC(cookies, sz);
603                         if (cookies == NULL) {
604                                 CERROR("%s: Cannot allocate memory (%d o) "
605                                        "for cookies vector "LPX64"\n",
606                                        mdt_obd_name(mdt), sz,
607                                        hsd.request[i].hal->hal_compound_id);
608                                 kuc_free(hal, hsd.request[i].hal_used_sz);
609                                 continue;
610                         }
611                         hai = hai_first(hal);
612                         for (j = 0; j < hsd.request[i].hal->hal_count; j++) {
613                                 cookies[j] = hai->hai_cookie;
614                                 hai = hai_next(hai);
615                         }
616
617                         rc = mdt_agent_record_update(mti->mti_env, mdt, cookies,
618                                                 hsd.request[i].hal->hal_count,
619                                                 status);
620                         if (rc)
621                                 CERROR("%s: mdt_agent_record_update() failed, "
622                                        "rc=%d, cannot update status to %s "
623                                        "for %d cookies\n",
624                                        mdt_obd_name(mdt), rc,
625                                        agent_req_status2name(status),
626                                        hsd.request[i].hal->hal_count);
627
628                         OBD_FREE(cookies, sz);
629                         kuc_free(hal, hsd.request[i].hal_used_sz);
630                 }
631 clean_cb_alloc:
632                 /* free cookie vector allocated for/by callback */
633                 if (hsd.cookies) {
634                         OBD_FREE(hsd.cookies, hsd.max_cookie * sizeof(__u64));
635                         hsd.max_cookie = 0;
636                         hsd.cookie_cnt = 0;
637                         hsd.cookies = NULL;
638                 }
639
640                 /* free hal allocated by callback */
641                 for (i = 0; i < hsd.max_requests; i++) {
642                         if (hsd.request[i].hal) {
643                                 OBD_FREE(hsd.request[i].hal,
644                                          hsd.request[i].hal_sz);
645                                 hsd.request[i].hal_sz = 0;
646                                 hsd.request[i].hal = NULL;
647                                 hsd.request_cnt--;
648                         }
649                 }
650                 LASSERT(hsd.request_cnt == 0);
651
652                 /* reset callback data */
653                 memset(hsd.request, 0, hsd.request_sz);
654         }
655         EXIT;
656 out:
657         if (hsd.request)
658                 OBD_FREE(hsd.request, hsd.request_sz);
659
660         if (hsd.cookies)
661                 OBD_FREE(hsd.cookies, hsd.max_cookie * sizeof(__u64));
662
663         if (cdt->cdt_state == CDT_STOPPING) {
664                 /* request comes from /proc path, so we need to clean cdt
665                  * struct */
666                  mdt_hsm_cdt_stop(mdt);
667                  mdt->mdt_opts.mo_coordinator = 0;
668         } else {
669                 /* request comes from a thread event, generated
670                  * by mdt_stop_coordinator(), we have to ack
671                  * and cdt cleaning will be done by event sender
672                  */
673                 cdt->cdt_thread.t_flags = SVC_STOPPED;
674                 wake_up(&cdt->cdt_thread.t_ctl_waitq);
675         }
676
677         if (rc != 0)
678                 CERROR("%s: coordinator thread exiting, process=%d, rc=%d\n",
679                        mdt_obd_name(mdt), current_pid(), rc);
680         else
681                 CDEBUG(D_HSM, "%s: coordinator thread exiting, process=%d,"
682                               " no error\n",
683                        mdt_obd_name(mdt), current_pid());
684
685         return rc;
686 }
687
688 /**
689  * lookup a restore handle by FID
690  * caller needs to hold cdt_restore_lock
691  * \param cdt [IN] coordinator
692  * \param fid [IN] FID
693  * \retval cdt_restore_handle found
694  * \retval NULL not found
695  */
696 static struct cdt_restore_handle *hsm_restore_hdl_find(struct coordinator *cdt,
697                                                        const struct lu_fid *fid)
698 {
699         struct cdt_restore_handle       *crh;
700         ENTRY;
701
702         list_for_each_entry(crh, &cdt->cdt_restore_hdl, crh_list) {
703                 if (lu_fid_eq(&crh->crh_fid, fid))
704                         RETURN(crh);
705         }
706         RETURN(NULL);
707 }
708
709 /**
710  * data passed to llog_cat_process() callback
711  * to scan requests and take actions
712  */
713 struct hsm_restore_data {
714         struct mdt_thread_info  *hrd_mti;
715 };
716
717 /**
718  *  llog_cat_process() callback, used to:
719  *  - find restore request and allocate the restore handle
720  * \param env [IN] environment
721  * \param llh [IN] llog handle
722  * \param hdr [IN] llog record
723  * \param data [IN/OUT] cb data = struct hsm_restore_data
724  * \retval 0 success
725  * \retval -ve failure
726  */
727 static int hsm_restore_cb(const struct lu_env *env,
728                           struct llog_handle *llh,
729                           struct llog_rec_hdr *hdr, void *data)
730 {
731         struct llog_agent_req_rec       *larr;
732         struct hsm_restore_data         *hrd;
733         struct cdt_restore_handle       *crh;
734         struct hsm_action_item          *hai;
735         struct mdt_thread_info          *mti;
736         struct coordinator              *cdt;
737         struct mdt_object               *child;
738         int rc;
739         ENTRY;
740
741         hrd = data;
742         mti = hrd->hrd_mti;
743         cdt = &mti->mti_mdt->mdt_coordinator;
744
745         larr = (struct llog_agent_req_rec *)hdr;
746         hai = &larr->arr_hai;
747         if (hai->hai_cookie > cdt->cdt_last_cookie)
748                 /* update the cookie to avoid collision */
749                 cdt->cdt_last_cookie = hai->hai_cookie + 1;
750
751         if (hai->hai_action != HSMA_RESTORE ||
752             agent_req_in_final_state(larr->arr_status))
753                 RETURN(0);
754
755         /* restore request not in a final state */
756
757         OBD_SLAB_ALLOC_PTR(crh, mdt_hsm_cdt_kmem);
758         if (crh == NULL)
759                 RETURN(-ENOMEM);
760
761         crh->crh_fid = hai->hai_fid;
762         /* in V1 all file is restored
763         crh->extent.start = hai->hai_extent.offset;
764         crh->extent.end = hai->hai_extent.offset + hai->hai_extent.length;
765         */
766         crh->crh_extent.start = 0;
767         crh->crh_extent.end = hai->hai_extent.length;
768         /* get the layout lock */
769         mdt_lock_reg_init(&crh->crh_lh, LCK_EX);
770         child = mdt_object_find_lock(mti, &crh->crh_fid, &crh->crh_lh,
771                                      MDS_INODELOCK_LAYOUT);
772         if (IS_ERR(child))
773                 GOTO(out, rc = PTR_ERR(child));
774
775         rc = 0;
776         /* we choose to not keep a reference
777          * on the object during the restore time which can be very long */
778         mdt_object_put(mti->mti_env, child);
779
780         mutex_lock(&cdt->cdt_restore_lock);
781         list_add_tail(&crh->crh_list, &cdt->cdt_restore_hdl);
782         mutex_unlock(&cdt->cdt_restore_lock);
783
784 out:
785         RETURN(rc);
786 }
787
788 /**
789  * restore coordinator state at startup
790  * the goal is to take a layout lock for each registered restore request
791  * \param mti [IN] context
792  */
793 static int mdt_hsm_pending_restore(struct mdt_thread_info *mti)
794 {
795         struct hsm_restore_data  hrd;
796         int                      rc;
797         ENTRY;
798
799         hrd.hrd_mti = mti;
800
801         rc = cdt_llog_process(mti->mti_env, mti->mti_mdt,
802                               hsm_restore_cb, &hrd);
803
804         RETURN(rc);
805 }
806
807 static int hsm_init_ucred(struct lu_ucred *uc)
808 {
809         ENTRY;
810
811         uc->uc_valid = UCRED_OLD;
812         uc->uc_o_uid = 0;
813         uc->uc_o_gid = 0;
814         uc->uc_o_fsuid = 0;
815         uc->uc_o_fsgid = 0;
816         uc->uc_uid = 0;
817         uc->uc_gid = 0;
818         uc->uc_fsuid = 0;
819         uc->uc_fsgid = 0;
820         uc->uc_suppgids[0] = -1;
821         uc->uc_suppgids[1] = -1;
822         uc->uc_cap = CFS_CAP_FS_MASK;
823         uc->uc_umask = 0777;
824         uc->uc_ginfo = NULL;
825         uc->uc_identity = NULL;
826
827         RETURN(0);
828 }
829
830 /**
831  * wake up coordinator thread
832  * \param mdt [IN] device
833  * \retval 0 success
834  * \retval -ve failure
835  */
836 int mdt_hsm_cdt_wakeup(struct mdt_device *mdt)
837 {
838         struct coordinator      *cdt = &mdt->mdt_coordinator;
839         ENTRY;
840
841         if (cdt->cdt_state == CDT_STOPPED)
842                 RETURN(-ESRCH);
843
844         /* wake up coordinator */
845         cdt->cdt_thread.t_flags = SVC_EVENT;
846         wake_up(&cdt->cdt_thread.t_ctl_waitq);
847
848         RETURN(0);
849 }
850
851 /**
852  * initialize coordinator struct
853  * \param mdt [IN] device
854  * \retval 0 success
855  * \retval -ve failure
856  */
857 int mdt_hsm_cdt_init(struct mdt_device *mdt)
858 {
859         struct coordinator      *cdt = &mdt->mdt_coordinator;
860         struct mdt_thread_info  *cdt_mti = NULL;
861         int                      rc;
862         ENTRY;
863
864         cdt->cdt_state = CDT_STOPPED;
865
866         init_waitqueue_head(&cdt->cdt_thread.t_ctl_waitq);
867         mutex_init(&cdt->cdt_llog_lock);
868         init_rwsem(&cdt->cdt_agent_lock);
869         init_rwsem(&cdt->cdt_request_lock);
870         mutex_init(&cdt->cdt_restore_lock);
871
872         CFS_INIT_LIST_HEAD(&cdt->cdt_requests);
873         CFS_INIT_LIST_HEAD(&cdt->cdt_agents);
874         CFS_INIT_LIST_HEAD(&cdt->cdt_restore_hdl);
875
876         rc = lu_env_init(&cdt->cdt_env, LCT_MD_THREAD);
877         if (rc < 0)
878                 RETURN(rc);
879
880         /* for mdt_ucred(), lu_ucred stored in lu_ucred_key */
881         rc = lu_context_init(&cdt->cdt_session, LCT_SERVER_SESSION);
882         if (rc == 0) {
883                 lu_context_enter(&cdt->cdt_session);
884                 cdt->cdt_env.le_ses = &cdt->cdt_session;
885         } else {
886                 lu_env_fini(&cdt->cdt_env);
887                 RETURN(rc);
888         }
889
890         cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
891         LASSERT(cdt_mti != NULL);
892
893         cdt_mti->mti_env = &cdt->cdt_env;
894         cdt_mti->mti_mdt = mdt;
895
896         hsm_init_ucred(mdt_ucred(cdt_mti));
897
898         /* default values for /proc tunnables
899          * can be override by MGS conf */
900         cdt->cdt_default_archive_id = 1;
901         cdt->cdt_grace_delay = 60;
902         cdt->cdt_loop_period = 10;
903         cdt->cdt_max_requests = 3;
904         cdt->cdt_policy = CDT_DEFAULT_POLICY;
905         cdt->cdt_active_req_timeout = 3600;
906
907         RETURN(0);
908 }
909
910 /**
911  * free a coordinator thread
912  * \param mdt [IN] device
913  */
914 int  mdt_hsm_cdt_fini(struct mdt_device *mdt)
915 {
916         struct coordinator *cdt = &mdt->mdt_coordinator;
917         ENTRY;
918
919         lu_context_exit(cdt->cdt_env.le_ses);
920         lu_context_fini(cdt->cdt_env.le_ses);
921
922         lu_env_fini(&cdt->cdt_env);
923
924         RETURN(0);
925 }
926
927 /**
928  * start a coordinator thread
929  * \param mdt [IN] device
930  * \retval 0 success
931  * \retval -ve failure
932  */
933 int mdt_hsm_cdt_start(struct mdt_device *mdt)
934 {
935         struct coordinator      *cdt = &mdt->mdt_coordinator;
936         int                      rc;
937         void                    *ptr;
938         struct mdt_thread_info  *cdt_mti;
939         struct task_struct      *task;
940         ENTRY;
941
942         /* functions defined but not yet used
943          * this avoid compilation warning
944          */
945         ptr = dump_requests;
946
947         if (cdt->cdt_state != CDT_STOPPED) {
948                 CERROR("%s: Coordinator already started\n",
949                        mdt_obd_name(mdt));
950                 RETURN(-EALREADY);
951         }
952
953         CLASSERT(1 << (CDT_POLICY_SHIFT_COUNT - 1) == CDT_POLICY_LAST);
954         cdt->cdt_policy = CDT_DEFAULT_POLICY;
955
956         cdt->cdt_state = CDT_INIT;
957
958         atomic_set(&cdt->cdt_compound_id, cfs_time_current_sec());
959         /* just need to be larger than previous one */
960         /* cdt_last_cookie is protected by cdt_llog_lock */
961         cdt->cdt_last_cookie = cfs_time_current_sec();
962         atomic_set(&cdt->cdt_request_count, 0);
963         cdt->cdt_user_request_mask = (1UL << HSMA_RESTORE);
964         cdt->cdt_group_request_mask = (1UL << HSMA_RESTORE);
965         cdt->cdt_other_request_mask = (1UL << HSMA_RESTORE);
966
967         /* to avoid deadlock when start is made through /proc
968          * /proc entries are created by the coordinator thread */
969
970         /* set up list of started restore requests */
971         cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
972         rc = mdt_hsm_pending_restore(cdt_mti);
973         if (rc)
974                 CERROR("%s: cannot take the layout locks needed"
975                        " for registered restore: %d\n",
976                        mdt_obd_name(mdt), rc);
977
978         task = kthread_run(mdt_coordinator, cdt_mti, "hsm_cdtr");
979         if (IS_ERR(task)) {
980                 rc = PTR_ERR(task);
981                 cdt->cdt_state = CDT_STOPPED;
982                 CERROR("%s: error starting coordinator thread: %d\n",
983                        mdt_obd_name(mdt), rc);
984                 RETURN(rc);
985         } else {
986                 CDEBUG(D_HSM, "%s: coordinator thread started\n",
987                        mdt_obd_name(mdt));
988                 rc = 0;
989         }
990
991         wait_event(cdt->cdt_thread.t_ctl_waitq,
992                        (cdt->cdt_thread.t_flags & SVC_RUNNING));
993
994         cdt->cdt_state = CDT_RUNNING;
995         mdt->mdt_opts.mo_coordinator = 1;
996         RETURN(0);
997 }
998
999 /**
1000  * stop a coordinator thread
1001  * \param mdt [IN] device
1002  */
1003 int mdt_hsm_cdt_stop(struct mdt_device *mdt)
1004 {
1005         struct coordinator              *cdt = &mdt->mdt_coordinator;
1006         struct cdt_agent_req            *car, *tmp1;
1007         struct hsm_agent                *ha, *tmp2;
1008         struct cdt_restore_handle       *crh, *tmp3;
1009         struct mdt_thread_info          *cdt_mti;
1010         ENTRY;
1011
1012         if (cdt->cdt_state == CDT_STOPPED) {
1013                 CERROR("%s: Coordinator already stopped\n",
1014                        mdt_obd_name(mdt));
1015                 RETURN(-EALREADY);
1016         }
1017
1018         if (cdt->cdt_state != CDT_STOPPING) {
1019                 /* stop coordinator thread before cleaning */
1020                 cdt->cdt_thread.t_flags = SVC_STOPPING;
1021                 wake_up(&cdt->cdt_thread.t_ctl_waitq);
1022                 wait_event(cdt->cdt_thread.t_ctl_waitq,
1023                            cdt->cdt_thread.t_flags & SVC_STOPPED);
1024         }
1025         cdt->cdt_state = CDT_STOPPED;
1026
1027         /* start cleaning */
1028         down_write(&cdt->cdt_request_lock);
1029         list_for_each_entry_safe(car, tmp1, &cdt->cdt_requests,
1030                                  car_request_list) {
1031                 list_del(&car->car_request_list);
1032                 mdt_cdt_free_request(car);
1033         }
1034         up_write(&cdt->cdt_request_lock);
1035
1036         down_write(&cdt->cdt_agent_lock);
1037         list_for_each_entry_safe(ha, tmp2, &cdt->cdt_agents, ha_list) {
1038                 list_del(&ha->ha_list);
1039                 OBD_FREE_PTR(ha);
1040         }
1041         up_write(&cdt->cdt_agent_lock);
1042
1043         cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
1044         mutex_lock(&cdt->cdt_restore_lock);
1045         list_for_each_entry_safe(crh, tmp3, &cdt->cdt_restore_hdl, crh_list) {
1046                 struct mdt_object       *child;
1047
1048                 /* give back layout lock */
1049                 child = mdt_object_find(&cdt->cdt_env, mdt, &crh->crh_fid);
1050                 if (!IS_ERR(child))
1051                         mdt_object_unlock_put(cdt_mti, child, &crh->crh_lh, 1);
1052
1053                 list_del(&crh->crh_list);
1054
1055                 OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
1056         }
1057         mutex_unlock(&cdt->cdt_restore_lock);
1058
1059         mdt->mdt_opts.mo_coordinator = 0;
1060
1061         RETURN(0);
1062 }
1063
1064 /**
1065  * register all requests from an hal in the memory list
1066  * \param mti [IN] context
1067  * \param hal [IN] request
1068  * \param uuid [OUT] in case of CANCEL, the uuid of the agent
1069  *  which is running the CT
1070  * \retval 0 success
1071  * \retval -ve failure
1072  */
1073 int mdt_hsm_add_hal(struct mdt_thread_info *mti,
1074                     struct hsm_action_list *hal, struct obd_uuid *uuid)
1075 {
1076         struct mdt_device       *mdt = mti->mti_mdt;
1077         struct coordinator      *cdt = &mdt->mdt_coordinator;
1078         struct hsm_action_item  *hai;
1079         int                      rc = 0, i;
1080         ENTRY;
1081
1082         /* register request in memory list */
1083         hai = hai_first(hal);
1084         for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
1085                 struct cdt_agent_req *car;
1086
1087                 /* in case of a cancel request, we first mark the ondisk
1088                  * record of the request we want to stop as canceled
1089                  * this does not change the cancel record
1090                  * it will be done when updating the request status
1091                  */
1092                 if (hai->hai_action == HSMA_CANCEL) {
1093                         rc = mdt_agent_record_update(mti->mti_env, mti->mti_mdt,
1094                                                      &hai->hai_cookie,
1095                                                      1, ARS_CANCELED);
1096                         if (rc) {
1097                                 CERROR("%s: mdt_agent_record_update() failed, "
1098                                        "rc=%d, cannot update status to %s "
1099                                        "for cookie "LPX64"\n",
1100                                        mdt_obd_name(mdt), rc,
1101                                        agent_req_status2name(ARS_CANCELED),
1102                                        hai->hai_cookie);
1103                                 GOTO(out, rc);
1104                         }
1105
1106                         /* find the running request to set it canceled */
1107                         car = mdt_cdt_find_request(cdt, hai->hai_cookie, NULL);
1108                         if (car != NULL) {
1109                                 car->car_canceled = 1;
1110                                 /* uuid has to be changed to the one running the
1111                                 * request to cancel */
1112                                 *uuid = car->car_uuid;
1113                                 mdt_cdt_put_request(car);
1114                         }
1115                         /* no need to memorize cancel request
1116                          * this also avoid a deadlock when we receive
1117                          * a purge all requests command
1118                          */
1119                         continue;
1120                 }
1121
1122                 if (hai->hai_action == HSMA_ARCHIVE) {
1123                         struct mdt_object *obj;
1124                         struct md_hsm hsm;
1125
1126                         obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &hsm);
1127                         if (IS_ERR(obj) && (PTR_ERR(obj) == -ENOENT))
1128                                 continue;
1129                         if (IS_ERR(obj))
1130                                 GOTO(out, rc = PTR_ERR(obj));
1131
1132                         hsm.mh_flags |= HS_EXISTS;
1133                         hsm.mh_arch_id = hal->hal_archive_id;
1134                         rc = mdt_hsm_attr_set(mti, obj, &hsm);
1135                         mdt_object_put(mti->mti_env, obj);
1136                         if (rc)
1137                                 GOTO(out, rc);
1138                 }
1139
1140                 car = mdt_cdt_alloc_request(hal->hal_compound_id,
1141                                             hal->hal_archive_id, hal->hal_flags,
1142                                             uuid, hai);
1143                 if (IS_ERR(car))
1144                         GOTO(out, rc = PTR_ERR(car));
1145
1146                 rc = mdt_cdt_add_request(cdt, car);
1147                 if (rc != 0)
1148                         mdt_cdt_free_request(car);
1149         }
1150 out:
1151         RETURN(rc);
1152 }
1153
1154 /**
1155  * swap layouts between 2 fids
1156  * \param mti [IN] context
1157  * \param fid1 [IN]
1158  * \param fid2 [IN]
1159  * \param mh_common [IN] MD HSM
1160  */
1161 static int hsm_swap_layouts(struct mdt_thread_info *mti,
1162                             const lustre_fid *fid, const lustre_fid *dfid,
1163                             struct md_hsm *mh_common)
1164 {
1165         struct mdt_device       *mdt = mti->mti_mdt;
1166         struct mdt_object       *child1, *child2;
1167         struct mdt_lock_handle  *lh2;
1168         int                      rc;
1169         ENTRY;
1170
1171         child1 = mdt_object_find(mti->mti_env, mdt, fid);
1172         if (IS_ERR(child1))
1173                 GOTO(out, rc = PTR_ERR(child1));
1174
1175         /* we already have layout lock on FID so take only
1176          * on dfid */
1177         lh2 = &mti->mti_lh[MDT_LH_OLD];
1178         mdt_lock_reg_init(lh2, LCK_EX);
1179         child2 = mdt_object_find_lock(mti, dfid, lh2, MDS_INODELOCK_LAYOUT);
1180         if (IS_ERR(child2))
1181                 GOTO(out_child1, rc = PTR_ERR(child2));
1182
1183         /* if copy tool closes the volatile before sending the final
1184          * progress through llapi_hsm_copy_end(), all the objects
1185          * are removed and mdd_swap_layout LBUG */
1186         if (!mdt_object_exists(child2)) {
1187                 CERROR("%s: Copytool has closed volatile file "DFID"\n",
1188                        mdt_obd_name(mti->mti_mdt), PFID(dfid));
1189                 GOTO(out_child2, rc = -ENOENT);
1190         }
1191         /* Since we only handle restores here, unconditionally use
1192          * SWAP_LAYOUTS_MDS_HSM flag to ensure original layout will
1193          * be preserved in case of failure during swap_layout and not
1194          * leave a file in an intermediate but incoherent state.
1195          * But need to setup HSM xattr of data FID before, reuse
1196          * mti and mh presets for FID in hsm_cdt_request_completed(),
1197          * only need to clear RELEASED and DIRTY.
1198          */
1199         mh_common->mh_flags &= ~(HS_RELEASED | HS_DIRTY);
1200         rc = mdt_hsm_attr_set(mti, child2, mh_common);
1201         if (rc == 0)
1202                 rc = mo_swap_layouts(mti->mti_env,
1203                                      mdt_object_child(child1),
1204                                      mdt_object_child(child2),
1205                                      SWAP_LAYOUTS_MDS_HSM);
1206
1207 out_child2:
1208         mdt_object_unlock_put(mti, child2, lh2, 1);
1209 out_child1:
1210         mdt_object_put(mti->mti_env, child1);
1211 out:
1212         RETURN(rc);
1213 }
1214
1215 /**
1216  * update status of a completed request
1217  * \param mti [IN] context
1218  * \param pgs [IN] progress of the copy tool
1219  * \param update_record [IN] update llog record
1220  * \retval 0 success
1221  * \retval -ve failure
1222  */
1223 static int hsm_cdt_request_completed(struct mdt_thread_info *mti,
1224                                      struct hsm_progress_kernel *pgs,
1225                                      const struct cdt_agent_req *car,
1226                                      enum agent_req_status *status)
1227 {
1228         const struct lu_env     *env = mti->mti_env;
1229         struct mdt_device       *mdt = mti->mti_mdt;
1230         struct coordinator      *cdt = &mdt->mdt_coordinator;
1231         struct mdt_object       *obj = NULL;
1232         int                      cl_flags = 0, rc = 0;
1233         struct md_hsm            mh;
1234         bool                     is_mh_changed;
1235         ENTRY;
1236
1237         /* default is to retry */
1238         *status = ARS_WAITING;
1239
1240         /* find object by FID */
1241         obj = mdt_hsm_get_md_hsm(mti, &car->car_hai->hai_fid, &mh);
1242         /* we will update MD HSM only if needed */
1243         is_mh_changed = false;
1244         if (IS_ERR(obj)) {
1245                 /* object removed */
1246                 *status = ARS_SUCCEED;
1247                 goto unlock;
1248         }
1249
1250         /* no need to change mh->mh_arch_id
1251          * mdt_hsm_get_md_hsm() got it from disk and it is still valid
1252          */
1253         if (pgs->hpk_errval != 0) {
1254                 switch (pgs->hpk_errval) {
1255                 case ENOSYS:
1256                         /* the copy tool does not support cancel
1257                          * so the cancel request is failed
1258                          * As we cannot distinguish a cancel progress
1259                          * from another action progress (they have the
1260                          * same cookie), we suppose here the CT returns
1261                          * ENOSYS only if does not support cancel
1262                          */
1263                         /* this can also happen when cdt calls it to
1264                          * for a timeouted request */
1265                         *status = ARS_FAILED;
1266                         /* to have a cancel event in changelog */
1267                         pgs->hpk_errval = ECANCELED;
1268                         break;
1269                 case ECANCELED:
1270                         /* the request record has already been set to
1271                          * ARS_CANCELED, this set the cancel request
1272                          * to ARS_SUCCEED */
1273                         *status = ARS_SUCCEED;
1274                         break;
1275                 default:
1276                         *status = (cdt->cdt_policy & CDT_NORETRY_ACTION ||
1277                                    !(pgs->hpk_flags & HP_FLAG_RETRY) ?
1278                                    ARS_FAILED : ARS_WAITING);
1279                         break;
1280                 }
1281
1282                 if (pgs->hpk_errval > CLF_HSM_MAXERROR) {
1283                         CERROR("%s: Request "LPX64" on "DFID
1284                                " failed, error code %d too large\n",
1285                                mdt_obd_name(mdt),
1286                                pgs->hpk_cookie, PFID(&pgs->hpk_fid),
1287                                pgs->hpk_errval);
1288                         hsm_set_cl_error(&cl_flags,
1289                                          CLF_HSM_ERROVERFLOW);
1290                         rc = -EINVAL;
1291                 } else {
1292                         hsm_set_cl_error(&cl_flags, pgs->hpk_errval);
1293                 }
1294
1295                 switch (car->car_hai->hai_action) {
1296                 case HSMA_ARCHIVE:
1297                         hsm_set_cl_event(&cl_flags, HE_ARCHIVE);
1298                         break;
1299                 case HSMA_RESTORE:
1300                         hsm_set_cl_event(&cl_flags, HE_RESTORE);
1301                         break;
1302                 case HSMA_REMOVE:
1303                         hsm_set_cl_event(&cl_flags, HE_REMOVE);
1304                         break;
1305                 case HSMA_CANCEL:
1306                         hsm_set_cl_event(&cl_flags, HE_CANCEL);
1307                         CERROR("%s: Failed request "LPX64" on "DFID
1308                                " cannot be a CANCEL\n",
1309                                mdt_obd_name(mdt),
1310                                pgs->hpk_cookie,
1311                                PFID(&pgs->hpk_fid));
1312                         break;
1313                 default:
1314                         CERROR("%s: Failed request "LPX64" on "DFID
1315                                " %d is an unknown action\n",
1316                                mdt_obd_name(mdt),
1317                                pgs->hpk_cookie, PFID(&pgs->hpk_fid),
1318                                car->car_hai->hai_action);
1319                         rc = -EINVAL;
1320                         break;
1321                 }
1322         } else {
1323                 *status = ARS_SUCCEED;
1324                 switch (car->car_hai->hai_action) {
1325                 case HSMA_ARCHIVE:
1326                         hsm_set_cl_event(&cl_flags, HE_ARCHIVE);
1327                         /* set ARCHIVE keep EXIST and clear LOST and
1328                          * DIRTY */
1329                         mh.mh_arch_ver = pgs->hpk_data_version;
1330                         mh.mh_flags |= HS_ARCHIVED;
1331                         mh.mh_flags &= ~(HS_LOST|HS_DIRTY);
1332                         is_mh_changed = true;
1333                         break;
1334                 case HSMA_RESTORE:
1335                         hsm_set_cl_event(&cl_flags, HE_RESTORE);
1336
1337                         /* do not clear RELEASED and DIRTY here
1338                          * this will occur in hsm_swap_layouts()
1339                          */
1340
1341                         /* Restoring has changed the file version on
1342                          * disk. */
1343                         mh.mh_arch_ver = pgs->hpk_data_version;
1344                         is_mh_changed = true;
1345                         break;
1346                 case HSMA_REMOVE:
1347                         hsm_set_cl_event(&cl_flags, HE_REMOVE);
1348                         /* clear ARCHIVED EXISTS and LOST */
1349                         mh.mh_flags &= ~(HS_ARCHIVED | HS_EXISTS | HS_LOST);
1350                         is_mh_changed = true;
1351                         break;
1352                 case HSMA_CANCEL:
1353                         hsm_set_cl_event(&cl_flags, HE_CANCEL);
1354                         CERROR("%s: Successful request "LPX64
1355                                " on "DFID
1356                                " cannot be a CANCEL\n",
1357                                mdt_obd_name(mdt),
1358                                pgs->hpk_cookie,
1359                                PFID(&pgs->hpk_fid));
1360                         break;
1361                 default:
1362                         CERROR("%s: Successful request "LPX64
1363                                " on "DFID
1364                                " %d is an unknown action\n",
1365                                mdt_obd_name(mdt),
1366                                pgs->hpk_cookie, PFID(&pgs->hpk_fid),
1367                                car->car_hai->hai_action);
1368                         rc = -EINVAL;
1369                         break;
1370                 }
1371         }
1372
1373         /* rc != 0 means error when analysing action, it may come from
1374          * a crasy CT no need to manage DIRTY
1375          */
1376         if (rc == 0)
1377                 hsm_set_cl_flags(&cl_flags,
1378                                  mh.mh_flags & HS_DIRTY ? CLF_HSM_DIRTY : 0);
1379
1380         /* unlock is done later, after layout lock management */
1381         if (is_mh_changed)
1382                 rc = mdt_hsm_attr_set(mti, obj, &mh);
1383
1384 unlock:
1385         /* we give back layout lock only if restore was successful or
1386          * if restore was canceled or if policy is to not retry
1387          * in other cases we just unlock the object */
1388         if (car->car_hai->hai_action == HSMA_RESTORE &&
1389             (pgs->hpk_errval == 0 || pgs->hpk_errval == ECANCELED ||
1390              cdt->cdt_policy & CDT_NORETRY_ACTION)) {
1391                 struct cdt_restore_handle       *crh;
1392
1393                 /* restore in data FID done, we swap the layouts
1394                  * only if restore is successfull */
1395                 if (pgs->hpk_errval == 0) {
1396                         rc = hsm_swap_layouts(mti, &car->car_hai->hai_fid,
1397                                               &car->car_hai->hai_dfid, &mh);
1398                         if (rc) {
1399                                 if (cdt->cdt_policy & CDT_NORETRY_ACTION)
1400                                         *status = ARS_FAILED;
1401                                 pgs->hpk_errval = -rc;
1402                         }
1403                 }
1404                 /* we have to retry, so keep layout lock */
1405                 if (*status == ARS_WAITING)
1406                         GOTO(out, rc);
1407
1408                 /* give back layout lock */
1409                 mutex_lock(&cdt->cdt_restore_lock);
1410                 crh = hsm_restore_hdl_find(cdt, &car->car_hai->hai_fid);
1411                 if (crh != NULL)
1412                         list_del(&crh->crh_list);
1413                 mutex_unlock(&cdt->cdt_restore_lock);
1414                 /* just give back layout lock, we keep
1415                  * the reference which is given back
1416                  * later with the lock for HSM flags */
1417                 if (!IS_ERR(obj) && crh != NULL)
1418                         mdt_object_unlock(mti, obj, &crh->crh_lh, 1);
1419
1420                 if (crh != NULL)
1421                         OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
1422         }
1423
1424         GOTO(out, rc);
1425
1426 out:
1427         if (obj != NULL && !IS_ERR(obj)) {
1428                 mo_changelog(env, CL_HSM, cl_flags,
1429                              mdt_object_child(obj));
1430                 mdt_object_put(mti->mti_env, obj);
1431         }
1432
1433         RETURN(rc);
1434 }
1435
1436 /**
1437  * update status of a request
1438  * \param mti [IN] context
1439  * \param pgs [IN] progress of the copy tool
1440  * \param update_record [IN] update llog record
1441  * \retval 0 success
1442  * \retval -ve failure
1443  */
1444 int mdt_hsm_update_request_state(struct mdt_thread_info *mti,
1445                                  struct hsm_progress_kernel *pgs,
1446                                  const int update_record)
1447 {
1448         struct mdt_device       *mdt = mti->mti_mdt;
1449         struct coordinator      *cdt = &mdt->mdt_coordinator;
1450         struct cdt_agent_req    *car;
1451         int                      rc = 0;
1452         ENTRY;
1453
1454         /* no coordinator started, so we cannot serve requests */
1455         if (cdt->cdt_state == CDT_STOPPED)
1456                 RETURN(-EAGAIN);
1457
1458         /* first do sanity checks */
1459         car = mdt_cdt_update_request(cdt, pgs);
1460         if (IS_ERR(car)) {
1461                 CERROR("%s: Cannot find running request for cookie "LPX64
1462                        " on fid="DFID"\n",
1463                        mdt_obd_name(mdt),
1464                        pgs->hpk_cookie, PFID(&pgs->hpk_fid));
1465                 if (car == NULL)
1466                         RETURN(-ENOENT);
1467                 RETURN(PTR_ERR(car));
1468         }
1469
1470         CDEBUG(D_HSM, "Progress received for fid="DFID" cookie="LPX64
1471                       " action=%s flags=%d err=%d fid="DFID" dfid="DFID"\n",
1472                       PFID(&pgs->hpk_fid), pgs->hpk_cookie,
1473                       hsm_copytool_action2name(car->car_hai->hai_action),
1474                       pgs->hpk_flags, pgs->hpk_errval,
1475                       PFID(&car->car_hai->hai_fid),
1476                       PFID(&car->car_hai->hai_dfid));
1477
1478         /* progress is done on FID or data FID depending of the action and
1479          * of the copy progress */
1480         /* for restore progress is used to send back the data FID to cdt */
1481         if (car->car_hai->hai_action == HSMA_RESTORE &&
1482             lu_fid_eq(&car->car_hai->hai_fid, &car->car_hai->hai_dfid))
1483                 car->car_hai->hai_dfid = pgs->hpk_fid;
1484
1485         if ((car->car_hai->hai_action == HSMA_RESTORE ||
1486              car->car_hai->hai_action == HSMA_ARCHIVE) &&
1487             (!lu_fid_eq(&pgs->hpk_fid, &car->car_hai->hai_dfid) &&
1488              !lu_fid_eq(&pgs->hpk_fid, &car->car_hai->hai_fid))) {
1489                 CERROR("%s: Progress on "DFID" for cookie "LPX64
1490                        " does not match request FID "DFID" nor data FID "
1491                        DFID"\n",
1492                        mdt_obd_name(mdt),
1493                        PFID(&pgs->hpk_fid), pgs->hpk_cookie,
1494                        PFID(&car->car_hai->hai_fid),
1495                        PFID(&car->car_hai->hai_dfid));
1496                 GOTO(out, rc = -EINVAL);
1497         }
1498
1499         if (pgs->hpk_errval != 0 && !(pgs->hpk_flags & HP_FLAG_COMPLETED)) {
1500                 CERROR("%s: Progress on "DFID" for cookie "LPX64" action=%s"
1501                        " is not coherent (err=%d and not completed"
1502                        " (flags=%d))\n",
1503                        mdt_obd_name(mdt),
1504                        PFID(&pgs->hpk_fid), pgs->hpk_cookie,
1505                        hsm_copytool_action2name(car->car_hai->hai_action),
1506                        pgs->hpk_errval, pgs->hpk_flags);
1507                 GOTO(out, rc = -EINVAL);
1508         }
1509
1510         /* now progress is valid */
1511
1512         /* we use a root like ucred */
1513         hsm_init_ucred(mdt_ucred(mti));
1514
1515         if (pgs->hpk_flags & HP_FLAG_COMPLETED) {
1516                 enum agent_req_status    status;
1517
1518                 rc = hsm_cdt_request_completed(mti, pgs, car, &status);
1519
1520                 /* remove request from memory list */
1521                 mdt_cdt_remove_request(cdt, pgs->hpk_cookie);
1522
1523                 CDEBUG(D_HSM, "Updating record: fid="DFID" cookie="LPX64
1524                               " action=%s status=%s\n", PFID(&pgs->hpk_fid),
1525                        pgs->hpk_cookie,
1526                        hsm_copytool_action2name(car->car_hai->hai_action),
1527                        agent_req_status2name(status));
1528
1529                 if (update_record) {
1530                         int rc1;
1531
1532                         rc1 = mdt_agent_record_update(mti->mti_env, mdt,
1533                                                      &pgs->hpk_cookie, 1,
1534                                                      status);
1535                         if (rc1)
1536                                 CERROR("%s: mdt_agent_record_update() failed,"
1537                                        " rc=%d, cannot update status to %s"
1538                                        " for cookie "LPX64"\n",
1539                                        mdt_obd_name(mdt), rc1,
1540                                        agent_req_status2name(status),
1541                                        pgs->hpk_cookie);
1542                         rc = (rc != 0 ? rc : rc1);
1543                 }
1544                 /* ct has completed a request, so a slot is available, wakeup
1545                  * cdt to find new work */
1546                 mdt_hsm_cdt_wakeup(mdt);
1547         } else {
1548                 /* if copytool send a progress on a canceled request
1549                  * we inform copytool it should stop
1550                  */
1551                 if (car->car_canceled == 1)
1552                         rc = -ECANCELED;
1553         }
1554         GOTO(out, rc);
1555
1556 out:
1557         /* remove ref got from mdt_cdt_update_request() */
1558         mdt_cdt_put_request(car);
1559
1560         return rc;
1561 }
1562
1563
1564 /**
1565  * data passed to llog_cat_process() callback
1566  * to cancel requests
1567  */
1568 struct hsm_cancel_all_data {
1569         struct mdt_device       *mdt;
1570 };
1571
1572 /**
1573  *  llog_cat_process() callback, used to:
1574  *  - purge all requests
1575  * \param env [IN] environment
1576  * \param llh [IN] llog handle
1577  * \param hdr [IN] llog record
1578  * \param data [IN] cb data = struct hsm_cancel_all_data
1579  * \retval 0 success
1580  * \retval -ve failure
1581  */
1582 static int mdt_cancel_all_cb(const struct lu_env *env,
1583                              struct llog_handle *llh,
1584                              struct llog_rec_hdr *hdr, void *data)
1585 {
1586         struct llog_agent_req_rec       *larr;
1587         struct hsm_cancel_all_data      *hcad;
1588         int                              rc = 0;
1589         ENTRY;
1590
1591         larr = (struct llog_agent_req_rec *)hdr;
1592         hcad = data;
1593         if (larr->arr_status == ARS_WAITING ||
1594             larr->arr_status == ARS_STARTED) {
1595                 larr->arr_status = ARS_CANCELED;
1596                 larr->arr_req_change = cfs_time_current_sec();
1597                 rc = mdt_agent_llog_update_rec(env, hcad->mdt, llh, larr);
1598                 if (rc == 0)
1599                         RETURN(LLOG_DEL_RECORD);
1600         }
1601         RETURN(rc);
1602 }
1603
1604 /**
1605  * cancel all actions
1606  * \param obd [IN] MDT device
1607  */
1608 static int hsm_cancel_all_actions(struct mdt_device *mdt)
1609 {
1610         struct mdt_thread_info          *mti;
1611         struct coordinator              *cdt = &mdt->mdt_coordinator;
1612         struct cdt_agent_req            *car;
1613         struct hsm_action_list          *hal = NULL;
1614         struct hsm_action_item          *hai;
1615         struct hsm_cancel_all_data       hcad;
1616         int                              hal_sz = 0, hal_len, rc;
1617         enum cdt_states                  save_state;
1618         ENTRY;
1619
1620         /* retrieve coordinator context */
1621         mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
1622
1623         /* disable coordinator */
1624         save_state = cdt->cdt_state;
1625         cdt->cdt_state = CDT_DISABLE;
1626
1627         /* send cancel to all running requests */
1628         down_read(&cdt->cdt_request_lock);
1629         list_for_each_entry(car, &cdt->cdt_requests, car_request_list) {
1630                 mdt_cdt_get_request(car);
1631                 /* request is not yet removed from list, it will be done
1632                  * when copytool will return progress
1633                  */
1634
1635                 if (car->car_hai->hai_action == HSMA_CANCEL) {
1636                         mdt_cdt_put_request(car);
1637                         continue;
1638                 }
1639
1640                 /* needed size */
1641                 hal_len = sizeof(*hal) + cfs_size_round(MTI_NAME_MAXLEN + 1) +
1642                           cfs_size_round(car->car_hai->hai_len);
1643
1644                 if (hal_len > hal_sz && hal_sz > 0) {
1645                         /* not enough room, free old buffer */
1646                         OBD_FREE(hal, hal_sz);
1647                         hal = NULL;
1648                 }
1649
1650                 /* empty buffer, allocate one */
1651                 if (hal == NULL) {
1652                         hal_sz = hal_len;
1653                         OBD_ALLOC(hal, hal_sz);
1654                         if (hal == NULL) {
1655                                 mdt_cdt_put_request(car);
1656                                 up_read(&cdt->cdt_request_lock);
1657                                 GOTO(out, rc = -ENOMEM);
1658                         }
1659                 }
1660
1661                 hal->hal_version = HAL_VERSION;
1662                 obd_uuid2fsname(hal->hal_fsname, mdt_obd_name(mdt),
1663                                 MTI_NAME_MAXLEN);
1664                 hal->hal_fsname[MTI_NAME_MAXLEN] = '\0';
1665                 hal->hal_compound_id = car->car_compound_id;
1666                 hal->hal_archive_id = car->car_archive_id;
1667                 hal->hal_flags = car->car_flags;
1668                 hal->hal_count = 0;
1669
1670                 hai = hai_first(hal);
1671                 memcpy(hai, car->car_hai, car->car_hai->hai_len);
1672                 hai->hai_action = HSMA_CANCEL;
1673                 hal->hal_count = 1;
1674
1675                 /* it is possible to safely call mdt_hsm_agent_send()
1676                  * (ie without a deadlock on cdt_request_lock), because the
1677                  * write lock is taken only if we are not in purge mode
1678                  * (mdt_hsm_agent_send() does not call mdt_cdt_add_request()
1679                  *   nor mdt_cdt_remove_request())
1680                  */
1681                 /* no conflict with cdt thread because cdt is disable and we
1682                  * have the request lock */
1683                 mdt_hsm_agent_send(mti, hal, 1);
1684
1685                 mdt_cdt_put_request(car);
1686         }
1687         up_read(&cdt->cdt_request_lock);
1688
1689         if (hal != NULL)
1690                 OBD_FREE(hal, hal_sz);
1691
1692         /* cancel all on-disk records */
1693         hcad.mdt = mdt;
1694
1695         rc = cdt_llog_process(mti->mti_env, mti->mti_mdt,
1696                               mdt_cancel_all_cb, &hcad);
1697 out:
1698         /* enable coordinator */
1699         cdt->cdt_state = save_state;
1700
1701         RETURN(rc);
1702 }
1703
1704 /**
1705  * check if a request is comptaible with file status
1706  * \param hai [IN] request description
1707  * \param hal_an [IN] request archive number (not used)
1708  * \param rq_flags [IN] request flags
1709  * \param hsm [IN] file HSM metadata
1710  * \retval boolean
1711  */
1712 bool mdt_hsm_is_action_compat(const struct hsm_action_item *hai,
1713                               const int hal_an, const __u64 rq_flags,
1714                               const struct md_hsm *hsm)
1715 {
1716         int      is_compat = false;
1717         int      hsm_flags;
1718         ENTRY;
1719
1720         hsm_flags = hsm->mh_flags;
1721         switch (hai->hai_action) {
1722         case HSMA_ARCHIVE:
1723                 if (!(hsm_flags & HS_NOARCHIVE) &&
1724                     (hsm_flags & HS_DIRTY || !(hsm_flags & HS_ARCHIVED)))
1725                         is_compat = true;
1726                 break;
1727         case HSMA_RESTORE:
1728                 if (!(hsm_flags & HS_DIRTY) && (hsm_flags & HS_RELEASED) &&
1729                     hsm_flags & HS_ARCHIVED && !(hsm_flags & HS_LOST))
1730                         is_compat = true;
1731                 break;
1732         case HSMA_REMOVE:
1733                 if (!(hsm_flags & HS_RELEASED) &&
1734                     (hsm_flags & (HS_ARCHIVED | HS_EXISTS)))
1735                         is_compat = true;
1736                 break;
1737         case HSMA_CANCEL:
1738                 is_compat = true;
1739                 break;
1740         }
1741         CDEBUG(D_HSM, "fid="DFID" action=%s flags="LPX64
1742                       " extent="LPX64"-"LPX64" hsm_flags=%.8X %s\n",
1743                       PFID(&hai->hai_fid),
1744                       hsm_copytool_action2name(hai->hai_action), rq_flags,
1745                       hai->hai_extent.offset, hai->hai_extent.length,
1746                       hsm->mh_flags,
1747                       (is_compat ? "compatible" : "uncompatible"));
1748
1749         RETURN(is_compat);
1750 }
1751
1752 /*
1753  * /proc interface used to get/set HSM behaviour (cdt->cdt_policy)
1754  */
1755 static const struct {
1756         __u64            bit;
1757         char            *name;
1758         char            *nickname;
1759 } hsm_policy_names[] = {
1760         { CDT_NONBLOCKING_RESTORE,      "NonBlockingRestore",   "NBR"},
1761         { CDT_NORETRY_ACTION,           "NoRetryAction",        "NRA"},
1762         { 0 },
1763 };
1764
1765 /**
1766  * convert a policy name to a bit
1767  * \param name [IN] policy name
1768  * \retval 0 unknown
1769  * \retval   policy bit
1770  */
1771 static __u64 hsm_policy_str2bit(const char *name)
1772 {
1773         int      i;
1774
1775         for (i = 0; hsm_policy_names[i].bit != 0; i++)
1776                 if (strcmp(hsm_policy_names[i].nickname, name) == 0 ||
1777                     strcmp(hsm_policy_names[i].name, name) == 0)
1778                         return hsm_policy_names[i].bit;
1779         return 0;
1780 }
1781
1782 /**
1783  * convert a policy bit field to a string
1784  * \param mask [IN] policy bit field
1785  * \param hexa [IN] print mask before bit names
1786  * \param buffer [OUT] string
1787  * \param count [IN] size of buffer
1788  */
1789 static void hsm_policy_bit2str(struct seq_file *m, const __u64 mask,
1790                                 const bool hexa)
1791 {
1792         int      i, j;
1793         __u64    bit;
1794         ENTRY;
1795
1796         if (hexa)
1797                 seq_printf(m, "("LPX64") ", mask);
1798
1799         for (i = 0; i < CDT_POLICY_SHIFT_COUNT; i++) {
1800                 bit = (1ULL << i);
1801
1802                 for (j = 0; hsm_policy_names[j].bit != 0; j++) {
1803                         if (hsm_policy_names[j].bit == bit)
1804                                 break;
1805                 }
1806                 if (bit & mask)
1807                         seq_printf(m, "[%s] ", hsm_policy_names[j].name);
1808                 else
1809                         seq_printf(m, "%s ", hsm_policy_names[j].name);
1810         }
1811         /* remove last ' ' */
1812         m->count--;
1813         seq_putc(m, '\0');
1814 }
1815
1816 /* methods to read/write HSM policy flags */
1817 static int mdt_hsm_policy_seq_show(struct seq_file *m, void *data)
1818 {
1819         struct mdt_device       *mdt = m->private;
1820         struct coordinator      *cdt = &mdt->mdt_coordinator;
1821         ENTRY;
1822
1823         hsm_policy_bit2str(m, cdt->cdt_policy, false);
1824         RETURN(0);
1825 }
1826
1827 static ssize_t
1828 mdt_hsm_policy_seq_write(struct file *file, const char __user *buffer,
1829                          size_t count, loff_t *off)
1830 {
1831         struct seq_file         *m = file->private_data;
1832         struct mdt_device       *mdt = m->private;
1833         struct coordinator      *cdt = &mdt->mdt_coordinator;
1834         char                    *start, *token, sign;
1835         char                    *buf;
1836         __u64                    policy;
1837         __u64                    add_mask, remove_mask, set_mask;
1838         int                      rc;
1839         ENTRY;
1840
1841         if (count + 1 > PAGE_SIZE)
1842                 RETURN(-EINVAL);
1843
1844         OBD_ALLOC(buf, count + 1);
1845         if (buf == NULL)
1846                 RETURN(-ENOMEM);
1847
1848         if (copy_from_user(buf, buffer, count))
1849                 GOTO(out, rc = -EFAULT);
1850
1851         buf[count] = '\0';
1852
1853         start = buf;
1854         CDEBUG(D_HSM, "%s: receive new policy: '%s'\n", mdt_obd_name(mdt),
1855                start);
1856
1857         add_mask = remove_mask = set_mask = 0;
1858         do {
1859                 token = strsep(&start, "\n ");
1860                 sign = *token;
1861
1862                 if (sign == '\0')
1863                         continue;
1864
1865                 if (sign == '-' || sign == '+')
1866                         token++;
1867
1868                 policy = hsm_policy_str2bit(token);
1869                 if (policy == 0) {
1870                         CWARN("%s: '%s' is unknown, "
1871                               "supported policies are:\n", mdt_obd_name(mdt),
1872                               token);
1873                         hsm_policy_bit2str(m, 0, false);
1874                         GOTO(out, rc = -EINVAL);
1875                 }
1876                 switch (sign) {
1877                 case '-':
1878                         remove_mask |= policy;
1879                         break;
1880                 case '+':
1881                         add_mask |= policy;
1882                         break;
1883                 default:
1884                         set_mask |= policy;
1885                         break;
1886                 }
1887
1888         } while (start != NULL);
1889
1890         CDEBUG(D_HSM, "%s: new policy: rm="LPX64" add="LPX64" set="LPX64"\n",
1891                mdt_obd_name(mdt), remove_mask, add_mask, set_mask);
1892
1893         /* if no sign in all string, it is a clear and set
1894          * if some sign found, all unsigned are converted
1895          * to add
1896          * P1 P2 = set to P1 and P2
1897          * P1 -P2 = add P1 clear P2 same as +P1 -P2
1898          */
1899         if (remove_mask == 0 && add_mask == 0) {
1900                 cdt->cdt_policy = set_mask;
1901         } else {
1902                 cdt->cdt_policy |= set_mask | add_mask;
1903                 cdt->cdt_policy &= ~remove_mask;
1904         }
1905
1906         GOTO(out, rc = count);
1907
1908 out:
1909         OBD_FREE(buf, count + 1);
1910         RETURN(rc);
1911 }
1912 LPROC_SEQ_FOPS(mdt_hsm_policy);
1913
1914 #define GENERATE_PROC_METHOD(VAR)                                       \
1915 static int mdt_hsm_##VAR##_seq_show(struct seq_file *m, void *data)     \
1916 {                                                                       \
1917         struct mdt_device       *mdt = m->private;                      \
1918         struct coordinator      *cdt = &mdt->mdt_coordinator;           \
1919         ENTRY;                                                          \
1920                                                                         \
1921         seq_printf(m, LPU64"\n", (__u64)cdt->VAR);                      \
1922         RETURN(0);                                                      \
1923 }                                                                       \
1924 static ssize_t                                                          \
1925 mdt_hsm_##VAR##_seq_write(struct file *file, const char __user *buffer, \
1926                           size_t count, loff_t *off)                    \
1927                                                                         \
1928 {                                                                       \
1929         struct seq_file         *m = file->private_data;                \
1930         struct mdt_device       *mdt = m->private;                      \
1931         struct coordinator      *cdt = &mdt->mdt_coordinator;           \
1932         int                      val;                                   \
1933         int                      rc;                                    \
1934         ENTRY;                                                          \
1935                                                                         \
1936         rc = lprocfs_write_helper(buffer, count, &val);                 \
1937         if (rc)                                                         \
1938                 RETURN(rc);                                             \
1939         if (val > 0) {                                                  \
1940                 cdt->VAR = val;                                         \
1941                 RETURN(count);                                          \
1942         }                                                               \
1943         RETURN(-EINVAL);                                                \
1944 }                                                                       \
1945
1946 GENERATE_PROC_METHOD(cdt_loop_period)
1947 GENERATE_PROC_METHOD(cdt_grace_delay)
1948 GENERATE_PROC_METHOD(cdt_active_req_timeout)
1949 GENERATE_PROC_METHOD(cdt_max_requests)
1950 GENERATE_PROC_METHOD(cdt_default_archive_id)
1951
1952 /*
1953  * procfs write method for MDT/hsm_control
1954  * proc entry is in mdt directory so data is mdt obd_device pointer
1955  */
1956 #define CDT_ENABLE_CMD   "enabled"
1957 #define CDT_STOP_CMD     "shutdown"
1958 #define CDT_DISABLE_CMD  "disabled"
1959 #define CDT_PURGE_CMD    "purge"
1960 #define CDT_HELP_CMD     "help"
1961 #define CDT_MAX_CMD_LEN  10
1962
1963 ssize_t
1964 mdt_hsm_cdt_control_seq_write(struct file *file, const char __user *buffer,
1965                               size_t count, loff_t *off)
1966 {
1967         struct seq_file         *m = file->private_data;
1968         struct obd_device       *obd = m->private;
1969         struct mdt_device       *mdt = mdt_dev(obd->obd_lu_dev);
1970         struct coordinator      *cdt = &(mdt->mdt_coordinator);
1971         int                      rc, usage = 0;
1972         char                     kernbuf[CDT_MAX_CMD_LEN];
1973         ENTRY;
1974
1975         if (count == 0 || count >= sizeof(kernbuf))
1976                 RETURN(-EINVAL);
1977
1978         if (copy_from_user(kernbuf, buffer, count))
1979                 RETURN(-EFAULT);
1980         kernbuf[count] = 0;
1981
1982         if (kernbuf[count - 1] == '\n')
1983                 kernbuf[count - 1] = 0;
1984
1985         rc = 0;
1986         if (strcmp(kernbuf, CDT_ENABLE_CMD) == 0) {
1987                 if (cdt->cdt_state == CDT_DISABLE) {
1988                         cdt->cdt_state = CDT_RUNNING;
1989                         mdt_hsm_cdt_wakeup(mdt);
1990                 } else {
1991                         rc = mdt_hsm_cdt_start(mdt);
1992                 }
1993         } else if (strcmp(kernbuf, CDT_STOP_CMD) == 0) {
1994                 if ((cdt->cdt_state == CDT_STOPPING) ||
1995                     (cdt->cdt_state == CDT_STOPPED)) {
1996                         CERROR("%s: Coordinator already stopped\n",
1997                                mdt_obd_name(mdt));
1998                         rc = -EALREADY;
1999                 } else {
2000                         cdt->cdt_state = CDT_STOPPING;
2001                 }
2002         } else if (strcmp(kernbuf, CDT_DISABLE_CMD) == 0) {
2003                 if ((cdt->cdt_state == CDT_STOPPING) ||
2004                     (cdt->cdt_state == CDT_STOPPED)) {
2005                         CERROR("%s: Coordinator is stopped\n",
2006                                mdt_obd_name(mdt));
2007                         rc = -EINVAL;
2008                 } else {
2009                         cdt->cdt_state = CDT_DISABLE;
2010                 }
2011         } else if (strcmp(kernbuf, CDT_PURGE_CMD) == 0) {
2012                 rc = hsm_cancel_all_actions(mdt);
2013         } else if (strcmp(kernbuf, CDT_HELP_CMD) == 0) {
2014                 usage = 1;
2015         } else {
2016                 usage = 1;
2017                 rc = -EINVAL;
2018         }
2019
2020         if (usage == 1)
2021                 CERROR("%s: Valid coordinator control commands are: "
2022                        "%s %s %s %s %s\n", mdt_obd_name(mdt),
2023                        CDT_ENABLE_CMD, CDT_STOP_CMD, CDT_DISABLE_CMD,
2024                        CDT_PURGE_CMD, CDT_HELP_CMD);
2025
2026         if (rc)
2027                 RETURN(rc);
2028
2029         RETURN(count);
2030 }
2031
2032 int mdt_hsm_cdt_control_seq_show(struct seq_file *m, void *data)
2033 {
2034         struct obd_device       *obd = m->private;
2035         struct coordinator      *cdt;
2036         ENTRY;
2037
2038         cdt = &(mdt_dev(obd->obd_lu_dev)->mdt_coordinator);
2039
2040         if (cdt->cdt_state == CDT_INIT)
2041                 seq_printf(m, "init\n");
2042         else if (cdt->cdt_state == CDT_RUNNING)
2043                 seq_printf(m, "enabled\n");
2044         else if (cdt->cdt_state == CDT_STOPPING)
2045                 seq_printf(m, "stopping\n");
2046         else if (cdt->cdt_state == CDT_STOPPED)
2047                 seq_printf(m, "stopped\n");
2048         else if (cdt->cdt_state == CDT_DISABLE)
2049                 seq_printf(m, "disabled\n");
2050         else
2051                 seq_printf(m, "unknown\n");
2052
2053         RETURN(0);
2054 }
2055
2056 static int
2057 mdt_hsm_request_mask_show(struct seq_file *m, __u64 mask)
2058 {
2059         int i, rc = 0;
2060         ENTRY;
2061
2062         for (i = 0; i < 8 * sizeof(mask); i++) {
2063                 if (mask & (1UL << i))
2064                         rc += seq_printf(m, "%s%s", rc == 0 ? "" : " ",
2065                                         hsm_copytool_action2name(i));
2066         }
2067         rc += seq_printf(m, "\n");
2068
2069         RETURN(rc);
2070 }
2071
2072 static int
2073 mdt_hsm_user_request_mask_seq_show(struct seq_file *m, void *data)
2074 {
2075         struct mdt_device *mdt = m->private;
2076         struct coordinator *cdt = &mdt->mdt_coordinator;
2077
2078         return mdt_hsm_request_mask_show(m, cdt->cdt_user_request_mask);
2079 }
2080
2081 static int
2082 mdt_hsm_group_request_mask_seq_show(struct seq_file *m, void *data)
2083 {
2084         struct mdt_device *mdt = m->private;
2085         struct coordinator *cdt = &mdt->mdt_coordinator;
2086
2087         return mdt_hsm_request_mask_show(m, cdt->cdt_group_request_mask);
2088 }
2089
2090 static int
2091 mdt_hsm_other_request_mask_seq_show(struct seq_file *m, void *data)
2092 {
2093         struct mdt_device *mdt = m->private;
2094         struct coordinator *cdt = &mdt->mdt_coordinator;
2095
2096         return mdt_hsm_request_mask_show(m, cdt->cdt_other_request_mask);
2097 }
2098
2099 static inline enum hsm_copytool_action
2100 hsm_copytool_name2action(const char *name)
2101 {
2102         if (strcasecmp(name, "NOOP") == 0)
2103                 return HSMA_NONE;
2104         else if (strcasecmp(name, "ARCHIVE") == 0)
2105                 return HSMA_ARCHIVE;
2106         else if (strcasecmp(name, "RESTORE") == 0)
2107                 return HSMA_RESTORE;
2108         else if (strcasecmp(name, "REMOVE") == 0)
2109                 return HSMA_REMOVE;
2110         else if (strcasecmp(name, "CANCEL") == 0)
2111                 return HSMA_CANCEL;
2112         else
2113                 return -1;
2114 }
2115
2116 static ssize_t
2117 mdt_write_hsm_request_mask(struct file *file, const char __user *user_buf,
2118                             size_t user_count, __u64 *mask)
2119 {
2120         char *buf, *pos, *name;
2121         size_t buf_size;
2122         __u64 new_mask = 0;
2123         int rc;
2124         ENTRY;
2125
2126         if (!(user_count < 4096))
2127                 RETURN(-ENOMEM);
2128
2129         buf_size = user_count + 1;
2130
2131         OBD_ALLOC(buf, buf_size);
2132         if (buf == NULL)
2133                 RETURN(-ENOMEM);
2134
2135         if (copy_from_user(buf, user_buf, buf_size - 1))
2136                 GOTO(out, rc = -EFAULT);
2137
2138         buf[buf_size - 1] = '\0';
2139
2140         pos = buf;
2141         while ((name = strsep(&pos, " \t\v\n")) != NULL) {
2142                 int action;
2143
2144                 if (*name == '\0')
2145                         continue;
2146
2147                 action = hsm_copytool_name2action(name);
2148                 if (action < 0)
2149                         GOTO(out, rc = -EINVAL);
2150
2151                 new_mask |= (1UL << action);
2152         }
2153
2154         *mask = new_mask;
2155         rc = user_count;
2156 out:
2157         OBD_FREE(buf, buf_size);
2158
2159         RETURN(rc);
2160 }
2161
2162 static ssize_t
2163 mdt_hsm_user_request_mask_seq_write(struct file *file, const char __user *buf,
2164                                         size_t count, loff_t *off)
2165 {
2166         struct seq_file         *m = file->private_data;
2167         struct mdt_device       *mdt = m->private;
2168         struct coordinator *cdt = &mdt->mdt_coordinator;
2169
2170         return mdt_write_hsm_request_mask(file, buf, count,
2171                                            &cdt->cdt_user_request_mask);
2172 }
2173
2174 static ssize_t
2175 mdt_hsm_group_request_mask_seq_write(struct file *file, const char __user *buf,
2176                                         size_t count, loff_t *off)
2177 {
2178         struct seq_file         *m = file->private_data;
2179         struct mdt_device       *mdt = m->private;
2180         struct coordinator      *cdt = &mdt->mdt_coordinator;
2181
2182         return mdt_write_hsm_request_mask(file, buf, count,
2183                                            &cdt->cdt_group_request_mask);
2184 }
2185
2186 static ssize_t
2187 mdt_hsm_other_request_mask_seq_write(struct file *file, const char __user *buf,
2188                                         size_t count, loff_t *off)
2189 {
2190         struct seq_file         *m = file->private_data;
2191         struct mdt_device       *mdt = m->private;
2192         struct coordinator      *cdt = &mdt->mdt_coordinator;
2193
2194         return mdt_write_hsm_request_mask(file, buf, count,
2195                                            &cdt->cdt_other_request_mask);
2196 }
2197
2198 LPROC_SEQ_FOPS(mdt_hsm_cdt_loop_period);
2199 LPROC_SEQ_FOPS(mdt_hsm_cdt_grace_delay);
2200 LPROC_SEQ_FOPS(mdt_hsm_cdt_active_req_timeout);
2201 LPROC_SEQ_FOPS(mdt_hsm_cdt_max_requests);
2202 LPROC_SEQ_FOPS(mdt_hsm_cdt_default_archive_id);
2203 LPROC_SEQ_FOPS(mdt_hsm_user_request_mask);
2204 LPROC_SEQ_FOPS(mdt_hsm_group_request_mask);
2205 LPROC_SEQ_FOPS(mdt_hsm_other_request_mask);
2206
2207 static struct lprocfs_seq_vars lprocfs_mdt_hsm_vars[] = {
2208         { .name =       "agents",
2209           .fops =       &mdt_hsm_agent_fops                     },
2210         { .name =       "actions",
2211           .fops =       &mdt_hsm_actions_fops,
2212           .proc_mode =  0444                                    },
2213         { .name =       "default_archive_id",
2214           .fops =       &mdt_hsm_cdt_default_archive_id_fops    },
2215         { .name =       "grace_delay",
2216           .fops =       &mdt_hsm_cdt_grace_delay_fops           },
2217         { .name =       "loop_period",
2218           .fops =       &mdt_hsm_cdt_loop_period_fops           },
2219         { .name =       "max_requests",
2220           .fops =       &mdt_hsm_cdt_max_requests_fops          },
2221         { .name =       "policy",
2222           .fops =       &mdt_hsm_policy_fops                    },
2223         { .name =       "active_request_timeout",
2224           .fops =       &mdt_hsm_cdt_active_req_timeout_fops    },
2225         { .name =       "active_requests",
2226           .fops =       &mdt_hsm_active_requests_fops           },
2227         { .name =       "user_request_mask",
2228           .fops =       &mdt_hsm_user_request_mask_fops,        },
2229         { .name =       "group_request_mask",
2230           .fops =       &mdt_hsm_group_request_mask_fops,       },
2231         { .name =       "other_request_mask",
2232           .fops =       &mdt_hsm_other_request_mask_fops,       },
2233         { 0 }
2234 };