Whamcloud - gitweb
LU-8626 hsm: count the number of started requests of each type
[fs/lustre-release.git] / lustre / mdt / mdt_coordinator.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * Copyright (c) 2011, 2012 Commissariat a l'energie atomique et aux energies
24  *                          alternatives
25  *
26  * Copyright (c) 2013, 2016, Intel Corporation.
27  * Use is subject to license terms.
28  */
29 /*
30  * lustre/mdt/mdt_coordinator.c
31  *
32  * Lustre HSM Coordinator
33  *
34  * Author: Jacques-Charles Lafoucriere <jacques-charles.lafoucriere@cea.fr>
35  * Author: Aurelien Degremont <aurelien.degremont@cea.fr>
36  * Author: Thomas Leibovici <thomas.leibovici@cea.fr>
37  */
38
39 #define DEBUG_SUBSYSTEM S_MDS
40
41 #include <linux/kthread.h>
42 #include <obd_support.h>
43 #include <lustre_export.h>
44 #include <obd.h>
45 #include <lprocfs_status.h>
46 #include <lustre_log.h>
47 #include <lustre_kernelcomm.h>
48 #include "mdt_internal.h"
49
50 static struct lprocfs_vars lprocfs_mdt_hsm_vars[];
51
52 /**
53  * get obj and HSM attributes on a fid
54  * \param mti [IN] context
55  * \param fid [IN] object fid
56  * \param hsm [OUT] HSM meta data
57  * \retval obj or error (-ENOENT if not found)
58  */
59 struct mdt_object *mdt_hsm_get_md_hsm(struct mdt_thread_info *mti,
60                                       const struct lu_fid *fid,
61                                       struct md_hsm *hsm)
62 {
63         struct md_attr          *ma;
64         struct mdt_object       *obj;
65         int                      rc;
66         ENTRY;
67
68         ma = &mti->mti_attr;
69         ma->ma_need = MA_HSM;
70         ma->ma_valid = 0;
71
72         /* find object by FID */
73         obj = mdt_object_find(mti->mti_env, mti->mti_mdt, fid);
74         if (IS_ERR(obj))
75                 RETURN(obj);
76
77         if (!mdt_object_exists(obj)) {
78                 /* no more object */
79                 mdt_object_put(mti->mti_env, obj);
80                 RETURN(ERR_PTR(-ENOENT));
81         }
82
83         rc = mdt_attr_get_complex(mti, obj, ma);
84         if (rc) {
85                 mdt_object_put(mti->mti_env, obj);
86                 RETURN(ERR_PTR(rc));
87         }
88
89         if (ma->ma_valid & MA_HSM)
90                 *hsm = ma->ma_hsm;
91         else
92                 memset(hsm, 0, sizeof(*hsm));
93         ma->ma_valid = 0;
94         RETURN(obj);
95 }
96
97 void mdt_hsm_dump_hal(int level, const char *prefix,
98                       struct hsm_action_list *hal)
99 {
100         int                      i, sz;
101         struct hsm_action_item  *hai;
102         char                     buf[12];
103
104         CDEBUG(level, "%s: HAL header: version %X count %d compound %#llx"
105                       " archive_id %d flags %#llx\n",
106                prefix, hal->hal_version, hal->hal_count,
107                hal->hal_compound_id, hal->hal_archive_id, hal->hal_flags);
108
109         hai = hai_first(hal);
110         for (i = 0; i < hal->hal_count; i++) {
111                 sz = hai->hai_len - sizeof(*hai);
112                 CDEBUG(level, "%s %d: fid="DFID" dfid="DFID
113                        " compound/cookie=%#llx/%#llx"
114                        " action=%s extent=%#llx-%#llx gid=%#llx"
115                        " datalen=%d data=[%s]\n",
116                        prefix, i,
117                        PFID(&hai->hai_fid), PFID(&hai->hai_dfid),
118                        hal->hal_compound_id, hai->hai_cookie,
119                        hsm_copytool_action2name(hai->hai_action),
120                        hai->hai_extent.offset,
121                        hai->hai_extent.length,
122                        hai->hai_gid, sz,
123                        hai_dump_data_field(hai, buf, sizeof(buf)));
124                 hai = hai_next(hai);
125         }
126 }
127
128 /**
129  * data passed to llog_cat_process() callback
130  * to scan requests and take actions
131  */
132 struct hsm_scan_request {
133         int                      hal_sz;
134         int                      hal_used_sz;
135         struct hsm_action_list  *hal;
136 };
137
138 struct hsm_scan_data {
139         struct mdt_thread_info          *mti;
140         char                             fs_name[MTI_NAME_MAXLEN+1];
141         /* are we scanning the logs for housekeeping, or just looking
142          * for new work?
143          */
144         bool                             housekeeping;
145         /* request to be send to agents */
146         int                              max_requests;  /** vector size */
147         int                              request_cnt;   /** used count */
148         struct hsm_scan_request         *request;
149 };
150
151 struct hsm_thread_data {
152         struct mdt_thread_info  *cdt_mti;
153         struct hsm_scan_request *request;
154 };
155 /**
156  * update status of a request
157  * this function is to be called from llog_cat_process()
158  * \param mti [IN] context
159  * \param llh [IN] llog handle
160  * \param hdr [IN] llog record
161  * \retval 0 success
162  * \retval -ve failure
163  */
164 static int mdt_hsm_complete_request(struct mdt_thread_info *mti,
165                                     struct llog_handle *llh,
166                                     struct llog_rec_hdr *hdr)
167 {
168         struct llog_agent_req_rec *larr = (struct llog_agent_req_rec *)hdr;
169         time64_t now = ktime_get_real_seconds();
170         struct hsm_progress_kernel pgs = {
171                 .hpk_fid = larr->arr_hai.hai_fid,
172                 .hpk_cookie = larr->arr_hai.hai_cookie,
173                 .hpk_extent = larr->arr_hai.hai_extent,
174                 .hpk_flags = HP_FLAG_COMPLETED,
175                 .hpk_errval = ENOSYS,
176                 .hpk_data_version = 0
177         };
178         int rc = 0;
179         ENTRY;
180         /* update request state, but do not record in llog, to
181          * avoid deadlock on cdt_llog_lock */
182         rc = mdt_hsm_update_request_state(mti, &pgs, 0);
183         if (rc) {
184                 CERROR("%s: cannot update timed out/cancelled request: "
185                        DFID" for cookie %#llx action=%s\n",
186                        mdt_obd_name(mti->mti_mdt),
187                        PFID(&pgs.hpk_fid), pgs.hpk_cookie,
188                        hsm_copytool_action2name(larr->arr_hai.hai_action));
189                 RETURN(rc);
190         }
191
192         /* XXX A cancel request cannot be cancelled. */
193         if (larr->arr_hai.hai_action == HSMA_CANCEL)
194                 RETURN(0);
195
196         larr->arr_status = ARS_CANCELED;
197         larr->arr_req_change = now;
198         rc = llog_write(mti->mti_env, llh, hdr, hdr->lrh_index);
199         if (rc < 0)
200                 CERROR("%s: cannot update agent log: rc = %d\n",
201                        mdt_obd_name(mti->mti_mdt), rc);
202
203         RETURN(rc);
204 }
205
206 /**
207  *  llog_cat_process() callback, used to:
208  *  - find waiting request and start action
209  *  - purge canceled and done requests
210  * \param env [IN] environment
211  * \param llh [IN] llog handle
212  * \param hdr [IN] llog record
213  * \param data [IN/OUT] cb data = struct hsm_scan_data
214  * \retval 0 success
215  * \retval -ve failure
216  */
217 static int mdt_coordinator_cb(const struct lu_env *env,
218                               struct llog_handle *llh,
219                               struct llog_rec_hdr *hdr,
220                               void *data)
221 {
222         struct llog_agent_req_rec       *larr;
223         struct hsm_scan_data            *hsd;
224         struct hsm_action_item          *hai;
225         struct mdt_device               *mdt;
226         struct coordinator              *cdt;
227         int                              rc;
228         ENTRY;
229
230         hsd = data;
231         mdt = hsd->mti->mti_mdt;
232         cdt = &mdt->mdt_coordinator;
233
234         larr = (struct llog_agent_req_rec *)hdr;
235         dump_llog_agent_req_rec("mdt_coordinator_cb(): ", larr);
236         switch (larr->arr_status) {
237         case ARS_WAITING: {
238                 int i;
239                 struct hsm_scan_request *request;
240
241                 /* Are agents full? */
242                 if (atomic_read(&cdt->cdt_request_count) >=
243                     cdt->cdt_max_requests)
244                         break;
245
246                 /* first search whether the request is found in the
247                  * list we have built. */
248                 request = NULL;
249                 for (i = 0; i < hsd->request_cnt; i++) {
250                         if (hsd->request[i].hal->hal_compound_id ==
251                             larr->arr_compound_id) {
252                                 request = &hsd->request[i];
253                                 break;
254                         }
255                 }
256
257                 if (!request) {
258                         struct hsm_action_list *hal;
259
260                         if (hsd->request_cnt == hsd->max_requests) {
261                                 if (!hsd->housekeeping) {
262                                         /* The request array is full,
263                                          * stop here. There might be
264                                          * more known requests that
265                                          * could be merged, but this
266                                          * avoid analyzing too many
267                                          * llogs for minor gains.
268                                          */
269                                         RETURN(LLOG_PROC_BREAK);
270                                 } else {
271                                         /* Unknown request and no more room
272                                          * for a new request. Continue to scan
273                                          * to find other entries for already
274                                          * existing requests.
275                                          */
276                                         RETURN(0);
277                                 }
278                         }
279
280                         request = &hsd->request[hsd->request_cnt];
281
282                         /* allocates hai vector size just needs to be large
283                          * enough */
284                         request->hal_sz =
285                                 sizeof(*request->hal) +
286                                 cfs_size_round(MTI_NAME_MAXLEN+1) +
287                                 2 * cfs_size_round(larr->arr_hai.hai_len);
288                         OBD_ALLOC(hal, request->hal_sz);
289                         if (!hal)
290                                 RETURN(-ENOMEM);
291                         hal->hal_version = HAL_VERSION;
292                         strlcpy(hal->hal_fsname, hsd->fs_name,
293                                 MTI_NAME_MAXLEN + 1);
294                         hal->hal_compound_id = larr->arr_compound_id;
295                         hal->hal_archive_id = larr->arr_archive_id;
296                         hal->hal_flags = larr->arr_flags;
297                         hal->hal_count = 0;
298                         request->hal_used_sz = hal_size(hal);
299                         request->hal = hal;
300                         hsd->request_cnt++;
301                         hai = hai_first(hal);
302                 } else {
303                         /* request is known */
304                         /* we check if record archive num is the same as the
305                          * known request, if not we will serve it in multiple
306                          * time because we do not know if the agent can serve
307                          * multiple backend
308                          * a use case is a compound made of multiple restore
309                          * where the files are not archived in the same backend
310                          */
311                         if (larr->arr_archive_id !=
312                             request->hal->hal_archive_id)
313                                 RETURN(0);
314
315                         if (request->hal_sz <
316                             request->hal_used_sz +
317                             cfs_size_round(larr->arr_hai.hai_len)) {
318                                 /* Not enough room, need an extension */
319                                 void *hal_buffer;
320                                 int sz;
321
322                                 sz = 2 * request->hal_sz;
323                                 OBD_ALLOC(hal_buffer, sz);
324                                 if (!hal_buffer)
325                                         RETURN(-ENOMEM);
326                                 memcpy(hal_buffer, request->hal,
327                                        request->hal_used_sz);
328                                 OBD_FREE(request->hal,
329                                          request->hal_sz);
330                                 request->hal = hal_buffer;
331                                 request->hal_sz = sz;
332                         }
333                         hai = hai_first(request->hal);
334                         for (i = 0; i < request->hal->hal_count; i++)
335                                 hai = hai_next(hai);
336                 }
337                 memcpy(hai, &larr->arr_hai, larr->arr_hai.hai_len);
338                 hai->hai_cookie = larr->arr_hai.hai_cookie;
339                 hai->hai_gid = larr->arr_hai.hai_gid;
340
341                 request->hal_used_sz += cfs_size_round(hai->hai_len);
342                 request->hal->hal_count++;
343
344                 if (hai->hai_action != HSMA_CANCEL)
345                         cdt_agent_record_hash_add(cdt, hai->hai_cookie,
346                                                   llh->lgh_hdr->llh_cat_idx,
347                                                   hdr->lrh_index);
348                 break;
349         }
350         case ARS_STARTED: {
351                 struct cdt_agent_req *car;
352                 time64_t now = ktime_get_real_seconds();
353                 time64_t last;
354
355                 if (!hsd->housekeeping)
356                         break;
357
358                 /* we search for a running request
359                  * error may happen if coordinator crashes or stopped
360                  * with running request
361                  */
362                 car = mdt_cdt_find_request(cdt, larr->arr_hai.hai_cookie);
363                 if (car == NULL) {
364                         last = larr->arr_req_change;
365                 } else {
366                         last = car->car_req_update;
367                         mdt_cdt_put_request(car);
368                 }
369
370                 /* test if request too long, if yes cancel it
371                  * the same way the copy tool acknowledge a cancel request */
372                 if (now <= last + cdt->cdt_active_req_timeout)
373                         RETURN(0);
374
375                 dump_llog_agent_req_rec("request timed out, start cleaning",
376                                         larr);
377                 /* a too old cancel request just needs to be removed
378                  * this can happen, if copy tool does not support
379                  * cancel for other requests, we have to remove the
380                  * running request and notify the copytool */
381                 rc = mdt_hsm_complete_request(hsd->mti, llh, hdr);
382                 if (rc == -ENOENT) {
383                         /* The request no longer exists, forget
384                          * about it, and do not send a cancel request
385                          * to the client, for which an error will be
386                          * sent back, leading to an endless cycle of
387                          * cancellation. */
388                         cdt_agent_record_hash_del(cdt,
389                                                   larr->arr_hai.hai_cookie);
390                         RETURN(LLOG_DEL_RECORD);
391                 }
392                 break;
393         }
394         case ARS_CANCELED:
395                 if (!hsd->housekeeping)
396                         break;
397                 if (larr->arr_req_change + cdt->cdt_grace_delay <
398                     ktime_get_real_seconds()) {
399                         rc = mdt_hsm_complete_request(hsd->mti, llh, hdr);
400                         /* See ENOENT comment above */
401                         if (rc == -ENOENT)
402                                 RETURN(LLOG_DEL_RECORD);
403                 }
404                 break;
405         case ARS_FAILED:
406         case ARS_SUCCEED:
407                 if (!hsd->housekeeping)
408                         break;
409
410                 if ((larr->arr_req_change + cdt->cdt_grace_delay) <
411                     ktime_get_real_seconds()) {
412                         cdt_agent_record_hash_del(cdt,
413                                                   larr->arr_hai.hai_cookie);
414                         RETURN(LLOG_DEL_RECORD);
415                 }
416                 break;
417         }
418         RETURN(0);
419 }
420
421 /**
422  * create /proc entries for coordinator
423  * \param mdt [IN]
424  * \retval 0 success
425  * \retval -ve failure
426  */
427 int hsm_cdt_procfs_init(struct mdt_device *mdt)
428 {
429         struct coordinator      *cdt = &mdt->mdt_coordinator;
430         int                      rc = 0;
431         ENTRY;
432
433         /* init /proc entries, failure is not critical */
434         cdt->cdt_proc_dir = lprocfs_register("hsm",
435                                              mdt2obd_dev(mdt)->obd_proc_entry,
436                                              lprocfs_mdt_hsm_vars, mdt);
437         if (IS_ERR(cdt->cdt_proc_dir)) {
438                 rc = PTR_ERR(cdt->cdt_proc_dir);
439                 CERROR("%s: Cannot create 'hsm' directory in mdt proc dir,"
440                        " rc=%d\n", mdt_obd_name(mdt), rc);
441                 cdt->cdt_proc_dir = NULL;
442                 RETURN(rc);
443         }
444
445         RETURN(0);
446 }
447
448 /**
449  * remove /proc entries for coordinator
450  * \param mdt [IN]
451  */
452 void hsm_cdt_procfs_fini(struct mdt_device *mdt)
453 {
454         struct coordinator *cdt = &mdt->mdt_coordinator;
455
456         if (cdt->cdt_proc_dir != NULL)
457                 lprocfs_remove(&cdt->cdt_proc_dir);
458 }
459
460 /**
461  * get vector of hsm cdt /proc vars
462  * \param none
463  * \retval var vector
464  */
465 struct lprocfs_vars *hsm_cdt_get_proc_vars(void)
466 {
467         return lprocfs_mdt_hsm_vars;
468 }
469
470 /* Release the ressource used by the coordinator. Called when the
471  * coordinator is stopping. */
472 static void mdt_hsm_cdt_cleanup(struct mdt_device *mdt)
473 {
474         struct coordinator              *cdt = &mdt->mdt_coordinator;
475         struct cdt_agent_req            *car, *tmp1;
476         struct hsm_agent                *ha, *tmp2;
477         struct cdt_restore_handle       *crh, *tmp3;
478         struct mdt_thread_info          *cdt_mti;
479
480         /* start cleaning */
481         down_write(&cdt->cdt_request_lock);
482         list_for_each_entry_safe(car, tmp1, &cdt->cdt_request_list,
483                                  car_request_list) {
484                 cfs_hash_del(cdt->cdt_request_cookie_hash,
485                              &car->car_hai->hai_cookie,
486                              &car->car_cookie_hash);
487                 list_del(&car->car_request_list);
488                 mdt_cdt_put_request(car);
489         }
490         up_write(&cdt->cdt_request_lock);
491
492         down_write(&cdt->cdt_agent_lock);
493         list_for_each_entry_safe(ha, tmp2, &cdt->cdt_agents, ha_list) {
494                 list_del(&ha->ha_list);
495                 OBD_FREE_PTR(ha);
496         }
497         up_write(&cdt->cdt_agent_lock);
498
499         cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
500         mutex_lock(&cdt->cdt_restore_lock);
501         list_for_each_entry_safe(crh, tmp3, &cdt->cdt_restore_hdl, crh_list) {
502                 list_del(&crh->crh_list);
503                 /* give back layout lock */
504                 mdt_object_unlock(cdt_mti, NULL, &crh->crh_lh, 1);
505                 OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
506         }
507         mutex_unlock(&cdt->cdt_restore_lock);
508 }
509
510 /*
511  * Coordinator state transition table, indexed on enum cdt_states, taking
512  * from and to states. For instance since CDT_INIT to CDT_RUNNING is a
513  * valid transition, cdt_transition[CDT_INIT][CDT_RUNNING] is true.
514  */
515 static bool cdt_transition[CDT_STATES_COUNT][CDT_STATES_COUNT] = {
516         /* from -> to:    stopped init   running disable stopping */
517         /* stopped */   { true,   true,  false,  false,  false },
518         /* init */      { true,   false, true,   false,  false },
519         /* running */   { false,  false, true,   true,   true },
520         /* disable */   { false,  false, true,   true,   true },
521         /* stopping */  { true,   false, false,  false,  false }
522 };
523
524 /**
525  * Change coordinator thread state
526  * Some combinations are not valid, so catch them here.
527  *
528  * Returns 0 on success, with old_state set if not NULL, or -EINVAL if
529  * the transition was not possible.
530  */
531 static int set_cdt_state(struct coordinator *cdt, enum cdt_states new_state,
532                          enum cdt_states *old_state)
533 {
534         int rc;
535         enum cdt_states state;
536
537         spin_lock(&cdt->cdt_state_lock);
538
539         state = cdt->cdt_state;
540
541         if (cdt_transition[state][new_state]) {
542                 cdt->cdt_state = new_state;
543                 spin_unlock(&cdt->cdt_state_lock);
544                 if (old_state)
545                         *old_state = state;
546                 rc = 0;
547         } else {
548                 spin_unlock(&cdt->cdt_state_lock);
549                 CDEBUG(D_HSM,
550                        "unexpected coordinator transition, from=%s, to=%s\n",
551                        cdt_mdt_state2str(state), cdt_mdt_state2str(new_state));
552                 rc = -EINVAL;
553         }
554
555         return rc;
556 }
557
558 /**
559  * coordinator thread
560  * \param data [IN] obd device
561  * \retval 0 success
562  * \retval -ve failure
563  */
564 static int mdt_coordinator(void *data)
565 {
566         struct hsm_thread_data  *thread_data = data;
567         struct mdt_thread_info  *mti = thread_data->cdt_mti;
568         struct mdt_device       *mdt = mti->mti_mdt;
569         struct coordinator      *cdt = &mdt->mdt_coordinator;
570         struct hsm_scan_data     hsd = { NULL };
571         time64_t                 last_housekeeping = 0;
572         int                      rc = 0;
573         int                      request_sz;
574         ENTRY;
575
576         /* set up hsd->request and max_requests */
577         hsd.max_requests = cdt->cdt_max_requests;
578         request_sz = hsd.max_requests * sizeof(*hsd.request);
579         hsd.request = thread_data->request;
580
581         CDEBUG(D_HSM, "%s: coordinator thread starting, pid=%d\n",
582                mdt_obd_name(mdt), current_pid());
583
584         hsd.mti = mti;
585         obd_uuid2fsname(hsd.fs_name, mdt_obd_name(mdt), MTI_NAME_MAXLEN);
586
587         set_cdt_state(cdt, CDT_RUNNING, NULL);
588
589         /* Inform mdt_hsm_cdt_start(). */
590         wake_up_all(&cdt->cdt_waitq);
591
592         while (1) {
593                 int i;
594                 int update_idx = 0;
595                 int updates_sz;
596                 int updates_cnt;
597                 struct hsm_record_update *updates;
598
599                 /* Limit execution of the expensive requests traversal
600                  * to at most one second. This prevents repeatedly
601                  * locking/unlocking the catalog for each request
602                  * and preventing other HSM operations from happening
603                  */
604                 wait_event_interruptible_timeout(cdt->cdt_waitq,
605                                                  kthread_should_stop() ||
606                                                  cdt->cdt_wakeup_coordinator,
607                                                  cfs_time_seconds(1));
608
609                 cdt->cdt_wakeup_coordinator = false;
610                 CDEBUG(D_HSM, "coordinator resumes\n");
611
612                 if (kthread_should_stop()) {
613                         CDEBUG(D_HSM, "Coordinator stops\n");
614                         rc = 0;
615                         break;
616                 }
617
618                 /* if coordinator is suspended continue to wait */
619                 if (cdt->cdt_state == CDT_DISABLE) {
620                         CDEBUG(D_HSM, "disable state, coordinator sleeps\n");
621                         continue;
622                 }
623
624                 /* If no event, and no housekeeping to do, continue to
625                  * wait. */
626                 if (last_housekeeping + cdt->cdt_loop_period <=
627                     ktime_get_real_seconds()) {
628                         last_housekeeping = ktime_get_real_seconds();
629                         hsd.housekeeping = true;
630                 } else if (cdt->cdt_event) {
631                         hsd.housekeeping = false;
632                 } else {
633                         continue;
634                 }
635
636                 cdt->cdt_event = false;
637
638                 CDEBUG(D_HSM, "coordinator starts reading llog\n");
639
640                 if (hsd.max_requests != cdt->cdt_max_requests) {
641                         /* cdt_max_requests has changed,
642                          * we need to allocate a new buffer
643                          */
644                         struct hsm_scan_request *tmp = NULL;
645                         int max_requests = cdt->cdt_max_requests;
646                         OBD_ALLOC_LARGE(tmp, max_requests *
647                                         sizeof(struct hsm_scan_request));
648                         if (!tmp) {
649                                 CERROR("Failed to resize request buffer, "
650                                        "keeping it at %d\n",
651                                        hsd.max_requests);
652                                 cdt->cdt_max_requests = hsd.max_requests;
653                         } else {
654                                 OBD_FREE_LARGE(hsd.request, request_sz);
655                                 hsd.max_requests = max_requests;
656                                 request_sz = hsd.max_requests *
657                                         sizeof(struct hsm_scan_request);
658                                 hsd.request = tmp;
659                         }
660                 }
661
662                 hsd.request_cnt = 0;
663
664                 rc = cdt_llog_process(mti->mti_env, mdt, mdt_coordinator_cb,
665                                       &hsd, 0, 0, WRITE);
666                 if (rc < 0)
667                         goto clean_cb_alloc;
668
669                 CDEBUG(D_HSM, "found %d requests to send\n", hsd.request_cnt);
670
671                 if (list_empty(&cdt->cdt_agents)) {
672                         CDEBUG(D_HSM, "no agent available, "
673                                       "coordinator sleeps\n");
674                         goto clean_cb_alloc;
675                 }
676
677                 /* Compute how many HAI we have in all the requests */
678                 updates_cnt = 0;
679                 for (i = 0; i < hsd.request_cnt; i++) {
680                         const struct hsm_scan_request *request =
681                                 &hsd.request[i];
682
683                         updates_cnt += request->hal->hal_count;
684                 }
685
686                 /* Allocate a temporary array to store the cookies to
687                  * update, and their status. */
688                 updates_sz = updates_cnt * sizeof(*updates);
689                 OBD_ALLOC(updates, updates_sz);
690                 if (updates == NULL) {
691                         CERROR("%s: Cannot allocate memory (%d o) "
692                                "for %d updates\n",
693                                mdt_obd_name(mdt), updates_sz, updates_cnt);
694                         continue;
695                 }
696
697                 /* here hsd contains a list of requests to be started */
698                 for (i = 0; i < hsd.request_cnt; i++) {
699                         struct hsm_scan_request *request = &hsd.request[i];
700                         struct hsm_action_list  *hal = request->hal;
701                         struct hsm_action_item  *hai;
702                         int                      j;
703
704                         /* still room for work ? */
705                         if (atomic_read(&cdt->cdt_request_count) >=
706                             cdt->cdt_max_requests)
707                                 break;
708
709                         rc = mdt_hsm_agent_send(mti, hal, 0);
710                         /* if failure, we suppose it is temporary
711                          * if the copy tool failed to do the request
712                          * it has to use hsm_progress
713                          */
714
715                         /* set up cookie vector to set records status
716                          * after copy tools start or failed
717                          */
718                         hai = hai_first(hal);
719                         for (j = 0; j < hal->hal_count; j++) {
720                                 updates[update_idx].cookie = hai->hai_cookie;
721                                 updates[update_idx].status =
722                                         (rc ? ARS_WAITING : ARS_STARTED);
723                                 hai = hai_next(hai);
724                                 update_idx++;
725                         }
726                 }
727
728                 if (update_idx) {
729                         rc = mdt_agent_record_update(mti->mti_env, mdt,
730                                                      updates, update_idx);
731                         if (rc)
732                                 CERROR("%s: mdt_agent_record_update() failed, "
733                                        "rc=%d, cannot update records "
734                                        "for %d cookies\n",
735                                        mdt_obd_name(mdt), rc, update_idx);
736                 }
737
738                 OBD_FREE(updates, updates_sz);
739
740 clean_cb_alloc:
741                 /* free hal allocated by callback */
742                 for (i = 0; i < hsd.request_cnt; i++) {
743                         struct hsm_scan_request *request = &hsd.request[i];
744
745                         OBD_FREE(request->hal, request->hal_sz);
746                 }
747         }
748
749         if (hsd.request)
750                 OBD_FREE_LARGE(hsd.request, request_sz);
751
752         mdt_hsm_cdt_cleanup(mdt);
753
754         if (rc != 0)
755                 CERROR("%s: coordinator thread exiting, process=%d, rc=%d\n",
756                        mdt_obd_name(mdt), current_pid(), rc);
757         else
758                 CDEBUG(D_HSM, "%s: coordinator thread exiting, process=%d,"
759                               " no error\n",
760                        mdt_obd_name(mdt), current_pid());
761
762         RETURN(rc);
763 }
764
765 /**
766  * lookup a restore handle by FID
767  * caller needs to hold cdt_restore_lock
768  * \param cdt [IN] coordinator
769  * \param fid [IN] FID
770  * \retval cdt_restore_handle found
771  * \retval NULL not found
772  */
773 struct cdt_restore_handle *mdt_hsm_restore_hdl_find(struct coordinator *cdt,
774                                                        const struct lu_fid *fid)
775 {
776         struct cdt_restore_handle       *crh;
777         ENTRY;
778
779         list_for_each_entry(crh, &cdt->cdt_restore_hdl, crh_list) {
780                 if (lu_fid_eq(&crh->crh_fid, fid))
781                         RETURN(crh);
782         }
783         RETURN(NULL);
784 }
785
786 /**
787  * data passed to llog_cat_process() callback
788  * to scan requests and take actions
789  */
790 struct hsm_restore_data {
791         struct mdt_thread_info  *hrd_mti;
792 };
793
794 /**
795  *  llog_cat_process() callback, used to:
796  *  - find restore request and allocate the restore handle
797  * \param env [IN] environment
798  * \param llh [IN] llog handle
799  * \param hdr [IN] llog record
800  * \param data [IN/OUT] cb data = struct hsm_restore_data
801  * \retval 0 success
802  * \retval -ve failure
803  */
804 static int hsm_restore_cb(const struct lu_env *env,
805                           struct llog_handle *llh,
806                           struct llog_rec_hdr *hdr, void *data)
807 {
808         struct llog_agent_req_rec       *larr;
809         struct hsm_restore_data         *hrd;
810         struct cdt_restore_handle       *crh;
811         struct hsm_action_item          *hai;
812         struct mdt_thread_info          *mti;
813         struct coordinator              *cdt;
814         struct mdt_object               *child;
815         int rc;
816         ENTRY;
817
818         hrd = data;
819         mti = hrd->hrd_mti;
820         cdt = &mti->mti_mdt->mdt_coordinator;
821
822         larr = (struct llog_agent_req_rec *)hdr;
823         hai = &larr->arr_hai;
824         if (hai->hai_cookie > cdt->cdt_last_cookie)
825                 /* update the cookie to avoid collision */
826                 cdt->cdt_last_cookie = hai->hai_cookie + 1;
827
828         if (hai->hai_action != HSMA_RESTORE ||
829             agent_req_in_final_state(larr->arr_status))
830                 RETURN(0);
831
832         /* restore request not in a final state */
833
834         /* force replay of restore requests left in started state from previous
835          * CDT context, to be canceled later if finally found to be incompatible
836          * when being re-started */
837         if (larr->arr_status == ARS_STARTED) {
838                 larr->arr_status = ARS_WAITING;
839                 larr->arr_req_change = ktime_get_real_seconds();
840                 rc = llog_write(env, llh, hdr, hdr->lrh_index);
841                 if (rc != 0)
842                         GOTO(out, rc);
843         }
844
845         OBD_SLAB_ALLOC_PTR(crh, mdt_hsm_cdt_kmem);
846         if (crh == NULL)
847                 RETURN(-ENOMEM);
848
849         crh->crh_fid = hai->hai_fid;
850         /* in V1 all file is restored
851         crh->extent.start = hai->hai_extent.offset;
852         crh->extent.end = hai->hai_extent.offset + hai->hai_extent.length;
853         */
854         crh->crh_extent.start = 0;
855         crh->crh_extent.end = hai->hai_extent.length;
856         /* get the layout lock */
857         mdt_lock_reg_init(&crh->crh_lh, LCK_EX);
858         child = mdt_object_find_lock(mti, &crh->crh_fid, &crh->crh_lh,
859                                      MDS_INODELOCK_LAYOUT);
860         if (IS_ERR(child))
861                 GOTO(out, rc = PTR_ERR(child));
862
863         rc = 0;
864         /* we choose to not keep a reference
865          * on the object during the restore time which can be very long */
866         mdt_object_put(mti->mti_env, child);
867
868         mutex_lock(&cdt->cdt_restore_lock);
869         list_add_tail(&crh->crh_list, &cdt->cdt_restore_hdl);
870         mutex_unlock(&cdt->cdt_restore_lock);
871
872 out:
873         RETURN(rc);
874 }
875
876 /**
877  * restore coordinator state at startup
878  * the goal is to take a layout lock for each registered restore request
879  * \param mti [IN] context
880  */
881 static int mdt_hsm_pending_restore(struct mdt_thread_info *mti)
882 {
883         struct hsm_restore_data  hrd;
884         int                      rc;
885         ENTRY;
886
887         hrd.hrd_mti = mti;
888
889         rc = cdt_llog_process(mti->mti_env, mti->mti_mdt, hsm_restore_cb, &hrd,
890                               0, 0, WRITE);
891
892         RETURN(rc);
893 }
894
895 static int hsm_init_ucred(struct lu_ucred *uc)
896 {
897         ENTRY;
898
899         uc->uc_valid = UCRED_OLD;
900         uc->uc_o_uid = 0;
901         uc->uc_o_gid = 0;
902         uc->uc_o_fsuid = 0;
903         uc->uc_o_fsgid = 0;
904         uc->uc_uid = 0;
905         uc->uc_gid = 0;
906         uc->uc_fsuid = 0;
907         uc->uc_fsgid = 0;
908         uc->uc_suppgids[0] = -1;
909         uc->uc_suppgids[1] = -1;
910         uc->uc_cap = CFS_CAP_FS_MASK;
911         uc->uc_umask = 0777;
912         uc->uc_ginfo = NULL;
913         uc->uc_identity = NULL;
914
915         RETURN(0);
916 }
917
918 /**
919  * initialize coordinator struct
920  * \param mdt [IN] device
921  * \retval 0 success
922  * \retval -ve failure
923  */
924 int mdt_hsm_cdt_init(struct mdt_device *mdt)
925 {
926         struct coordinator      *cdt = &mdt->mdt_coordinator;
927         struct mdt_thread_info  *cdt_mti = NULL;
928         int                      rc;
929         ENTRY;
930
931         init_waitqueue_head(&cdt->cdt_waitq);
932         init_rwsem(&cdt->cdt_llog_lock);
933         init_rwsem(&cdt->cdt_agent_lock);
934         init_rwsem(&cdt->cdt_request_lock);
935         mutex_init(&cdt->cdt_restore_lock);
936         spin_lock_init(&cdt->cdt_state_lock);
937         set_cdt_state(cdt, CDT_STOPPED, NULL);
938
939         INIT_LIST_HEAD(&cdt->cdt_request_list);
940         INIT_LIST_HEAD(&cdt->cdt_agents);
941         INIT_LIST_HEAD(&cdt->cdt_restore_hdl);
942
943         cdt->cdt_request_cookie_hash = cfs_hash_create("REQUEST_COOKIE_HASH",
944                                                        CFS_HASH_BITS_MIN,
945                                                        CFS_HASH_BITS_MAX,
946                                                        CFS_HASH_BKT_BITS,
947                                                        0 /* extra bytes */,
948                                                        CFS_HASH_MIN_THETA,
949                                                        CFS_HASH_MAX_THETA,
950                                                 &cdt_request_cookie_hash_ops,
951                                                        CFS_HASH_DEFAULT);
952         if (cdt->cdt_request_cookie_hash == NULL)
953                 RETURN(-ENOMEM);
954
955         cdt->cdt_agent_record_hash = cfs_hash_create("AGENT_RECORD_HASH",
956                                                      CFS_HASH_BITS_MIN,
957                                                      CFS_HASH_BITS_MAX,
958                                                      CFS_HASH_BKT_BITS,
959                                                      0 /* extra bytes */,
960                                                      CFS_HASH_MIN_THETA,
961                                                      CFS_HASH_MAX_THETA,
962                                                      &cdt_agent_record_hash_ops,
963                                                      CFS_HASH_DEFAULT);
964         if (cdt->cdt_agent_record_hash == NULL)
965                 GOTO(out_request_cookie_hash, rc = -ENOMEM);
966
967         rc = lu_env_init(&cdt->cdt_env, LCT_MD_THREAD);
968         if (rc < 0)
969                 GOTO(out_agent_record_hash, rc);
970
971         /* for mdt_ucred(), lu_ucred stored in lu_ucred_key */
972         rc = lu_context_init(&cdt->cdt_session, LCT_SERVER_SESSION);
973         if (rc < 0)
974                 GOTO(out_env, rc);
975
976         lu_context_enter(&cdt->cdt_session);
977         cdt->cdt_env.le_ses = &cdt->cdt_session;
978
979         cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
980         LASSERT(cdt_mti != NULL);
981
982         cdt_mti->mti_env = &cdt->cdt_env;
983         cdt_mti->mti_mdt = mdt;
984
985         hsm_init_ucred(mdt_ucred(cdt_mti));
986
987         /* default values for /proc tunnables
988          * can be override by MGS conf */
989         cdt->cdt_default_archive_id = 1;
990         cdt->cdt_grace_delay = 60;
991         cdt->cdt_loop_period = 10;
992         cdt->cdt_max_requests = 3;
993         cdt->cdt_policy = CDT_DEFAULT_POLICY;
994         cdt->cdt_active_req_timeout = 3600;
995
996         /* Initialize cdt_compound_id here to allow its usage for
997          * delayed requests from RAoLU policy */
998         atomic_set(&cdt->cdt_compound_id, ktime_get_real_seconds());
999
1000         /* by default do not remove archives on last unlink */
1001         cdt->cdt_remove_archive_on_last_unlink = false;
1002
1003         RETURN(0);
1004
1005 out_env:
1006         lu_env_fini(&cdt->cdt_env);
1007 out_agent_record_hash:
1008         cfs_hash_putref(cdt->cdt_agent_record_hash);
1009         cdt->cdt_agent_record_hash = NULL;
1010 out_request_cookie_hash:
1011         cfs_hash_putref(cdt->cdt_request_cookie_hash);
1012         cdt->cdt_request_cookie_hash = NULL;
1013
1014         return rc;
1015 }
1016
1017 /**
1018  * free a coordinator thread
1019  * \param mdt [IN] device
1020  */
1021 int  mdt_hsm_cdt_fini(struct mdt_device *mdt)
1022 {
1023         struct coordinator *cdt = &mdt->mdt_coordinator;
1024         ENTRY;
1025
1026         lu_context_exit(cdt->cdt_env.le_ses);
1027         lu_context_fini(cdt->cdt_env.le_ses);
1028
1029         lu_env_fini(&cdt->cdt_env);
1030
1031         cfs_hash_putref(cdt->cdt_agent_record_hash);
1032         cdt->cdt_agent_record_hash = NULL;
1033
1034         cfs_hash_putref(cdt->cdt_request_cookie_hash);
1035         cdt->cdt_request_cookie_hash = NULL;
1036
1037         RETURN(0);
1038 }
1039
1040 /**
1041  * start a coordinator thread
1042  * \param mdt [IN] device
1043  * \retval 0 success
1044  * \retval -ve failure
1045  */
1046 static int mdt_hsm_cdt_start(struct mdt_device *mdt)
1047 {
1048         struct coordinator      *cdt = &mdt->mdt_coordinator;
1049         int                      rc;
1050         void                    *ptr;
1051         struct task_struct      *task;
1052         int                      request_sz;
1053         struct hsm_thread_data   thread_data;
1054         ENTRY;
1055
1056         /* functions defined but not yet used
1057          * this avoid compilation warning
1058          */
1059         ptr = dump_requests;
1060
1061         rc = set_cdt_state(cdt, CDT_INIT, NULL);
1062         if (rc) {
1063                 CERROR("%s: Coordinator already started or stopping\n",
1064                        mdt_obd_name(mdt));
1065                 RETURN(-EALREADY);
1066         }
1067
1068         CLASSERT(1 << (CDT_POLICY_SHIFT_COUNT - 1) == CDT_POLICY_LAST);
1069         cdt->cdt_policy = CDT_DEFAULT_POLICY;
1070
1071         /* just need to be larger than previous one */
1072         /* cdt_last_cookie is protected by cdt_llog_lock */
1073         cdt->cdt_last_cookie = ktime_get_real_seconds();
1074         atomic_set(&cdt->cdt_request_count, 0);
1075         atomic_set(&cdt->cdt_archive_count, 0);
1076         atomic_set(&cdt->cdt_restore_count, 0);
1077         atomic_set(&cdt->cdt_remove_count, 0);
1078         cdt->cdt_user_request_mask = (1UL << HSMA_RESTORE);
1079         cdt->cdt_group_request_mask = (1UL << HSMA_RESTORE);
1080         cdt->cdt_other_request_mask = (1UL << HSMA_RESTORE);
1081
1082         /* to avoid deadlock when start is made through /proc
1083          * /proc entries are created by the coordinator thread */
1084
1085         /* set up list of started restore requests */
1086         thread_data.cdt_mti =
1087                 lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
1088         rc = mdt_hsm_pending_restore(thread_data.cdt_mti);
1089         if (rc)
1090                 CERROR("%s: cannot take the layout locks needed"
1091                        " for registered restore: %d\n",
1092                        mdt_obd_name(mdt), rc);
1093
1094         if (mdt->mdt_bottom->dd_rdonly)
1095                 RETURN(0);
1096
1097         /* Allocate the initial hsd.request[] vector*/
1098         request_sz = cdt->cdt_max_requests * sizeof(struct hsm_scan_request);
1099         OBD_ALLOC_LARGE(thread_data.request, request_sz);
1100         if (!thread_data.request) {
1101                 set_cdt_state(cdt, CDT_STOPPED, NULL);
1102                 RETURN(-ENOMEM);
1103         }
1104
1105         task = kthread_run(mdt_coordinator, &thread_data, "hsm_cdtr");
1106         if (IS_ERR(task)) {
1107                 rc = PTR_ERR(task);
1108                 set_cdt_state(cdt, CDT_STOPPED, NULL);
1109                 OBD_FREE(thread_data.request, request_sz);
1110                 CERROR("%s: error starting coordinator thread: %d\n",
1111                        mdt_obd_name(mdt), rc);
1112         } else {
1113                 cdt->cdt_task = task;
1114                 wait_event(cdt->cdt_waitq,
1115                            cdt->cdt_state != CDT_INIT);
1116                 CDEBUG(D_HSM, "%s: coordinator thread started\n",
1117                        mdt_obd_name(mdt));
1118                 rc = 0;
1119         }
1120
1121         RETURN(rc);
1122 }
1123
1124 /**
1125  * stop a coordinator thread
1126  * \param mdt [IN] device
1127  */
1128 int mdt_hsm_cdt_stop(struct mdt_device *mdt)
1129 {
1130         struct coordinator *cdt = &mdt->mdt_coordinator;
1131         int rc;
1132
1133         ENTRY;
1134         /* stop coordinator thread */
1135         rc = set_cdt_state(cdt, CDT_STOPPING, NULL);
1136         if (rc == 0) {
1137                 kthread_stop(cdt->cdt_task);
1138                 cdt->cdt_task = NULL;
1139                 set_cdt_state(cdt, CDT_STOPPED, NULL);
1140         }
1141
1142         RETURN(rc);
1143 }
1144
1145 static int mdt_hsm_set_exists(struct mdt_thread_info *mti,
1146                               const struct lu_fid *fid,
1147                               u32 archive_id)
1148 {
1149         struct mdt_object *obj;
1150         struct md_hsm mh;
1151         int rc;
1152
1153         obj = mdt_hsm_get_md_hsm(mti, fid, &mh);
1154         if (IS_ERR(obj))
1155                 GOTO(out, rc = PTR_ERR(obj));
1156
1157         if (mh.mh_flags & HS_EXISTS &&
1158             mh.mh_arch_id == archive_id)
1159                 GOTO(out_obj, rc = 0);
1160
1161         mh.mh_flags |= HS_EXISTS;
1162         mh.mh_arch_id = archive_id;
1163         rc = mdt_hsm_attr_set(mti, obj, &mh);
1164
1165 out_obj:
1166         mdt_object_put(mti->mti_env, obj);
1167 out:
1168         return rc;
1169 }
1170
1171 /**
1172  * register all requests from an hal in the memory list
1173  * \param mti [IN] context
1174  * \param hal [IN] request
1175  * \param uuid [OUT] in case of CANCEL, the uuid of the agent
1176  *  which is running the CT
1177  * \retval 0 success
1178  * \retval -ve failure
1179  */
1180 int mdt_hsm_add_hal(struct mdt_thread_info *mti,
1181                     struct hsm_action_list *hal, struct obd_uuid *uuid)
1182 {
1183         struct mdt_device       *mdt = mti->mti_mdt;
1184         struct coordinator      *cdt = &mdt->mdt_coordinator;
1185         struct hsm_action_item  *hai;
1186         int                      rc = 0, i;
1187         ENTRY;
1188
1189         /* register request in memory list */
1190         hai = hai_first(hal);
1191         for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
1192                 struct cdt_agent_req *car;
1193
1194                 /* in case of a cancel request, we first mark the ondisk
1195                  * record of the request we want to stop as canceled
1196                  * this does not change the cancel record
1197                  * it will be done when updating the request status
1198                  */
1199                 if (hai->hai_action == HSMA_CANCEL) {
1200                         struct hsm_record_update update = {
1201                                 .cookie = hai->hai_cookie,
1202                                 .status = ARS_CANCELED,
1203                         };
1204
1205                         rc = mdt_agent_record_update(mti->mti_env, mti->mti_mdt,
1206                                                      &update, 1);
1207                         if (rc) {
1208                                 CERROR("%s: mdt_agent_record_update() failed, "
1209                                        "rc=%d, cannot update status to %s "
1210                                        "for cookie %#llx\n",
1211                                        mdt_obd_name(mdt), rc,
1212                                        agent_req_status2name(ARS_CANCELED),
1213                                        hai->hai_cookie);
1214                                 GOTO(out, rc);
1215                         }
1216
1217                         /* find the running request to set it canceled */
1218                         car = mdt_cdt_find_request(cdt, hai->hai_cookie);
1219                         if (car != NULL) {
1220                                 car->car_canceled = 1;
1221                                 car->car_delay_update = 1;
1222                                 /* uuid has to be changed to the one running the
1223                                 * request to cancel */
1224                                 *uuid = car->car_uuid;
1225                                 mdt_cdt_put_request(car);
1226                         }
1227                         /* no need to memorize cancel request
1228                          * this also avoid a deadlock when we receive
1229                          * a purge all requests command
1230                          */
1231                         continue;
1232                 }
1233
1234                 if (hai->hai_action == HSMA_ARCHIVE) {
1235                         rc = mdt_hsm_set_exists(mti, &hai->hai_fid,
1236                                                 hal->hal_archive_id);
1237                         if (rc == -ENOENT)
1238                                 continue;
1239                         else if (rc < 0)
1240                                 GOTO(out, rc);
1241                 }
1242
1243                 car = mdt_cdt_alloc_request(hal->hal_compound_id,
1244                                             hal->hal_archive_id, hal->hal_flags,
1245                                             uuid, hai);
1246                 if (IS_ERR(car))
1247                         GOTO(out, rc = PTR_ERR(car));
1248
1249                 rc = mdt_cdt_add_request(cdt, car);
1250                 if (rc != 0)
1251                         mdt_cdt_free_request(car);
1252         }
1253 out:
1254         RETURN(rc);
1255 }
1256
1257 /**
1258  * swap layouts between 2 fids
1259  * \param mti [IN] context
1260  * \param obj [IN]
1261  * \param dfid [IN]
1262  * \param mh_common [IN] MD HSM
1263  */
1264 static int hsm_swap_layouts(struct mdt_thread_info *mti,
1265                             struct mdt_object *obj, const struct lu_fid *dfid,
1266                             struct md_hsm *mh_common)
1267 {
1268         struct mdt_object       *dobj;
1269         struct mdt_lock_handle  *dlh;
1270         int                      rc;
1271         ENTRY;
1272
1273         if (!mdt_object_exists(obj))
1274                 GOTO(out, rc = -ENOENT);
1275
1276         /* we already have layout lock on obj so take only
1277          * on dfid */
1278         dlh = &mti->mti_lh[MDT_LH_OLD];
1279         mdt_lock_reg_init(dlh, LCK_EX);
1280         dobj = mdt_object_find_lock(mti, dfid, dlh, MDS_INODELOCK_LAYOUT);
1281         if (IS_ERR(dobj))
1282                 GOTO(out, rc = PTR_ERR(dobj));
1283
1284         /* if copy tool closes the volatile before sending the final
1285          * progress through llapi_hsm_copy_end(), all the objects
1286          * are removed and mdd_swap_layout LBUG */
1287         if (!mdt_object_exists(dobj)) {
1288                 CERROR("%s: Copytool has closed volatile file "DFID"\n",
1289                        mdt_obd_name(mti->mti_mdt), PFID(dfid));
1290                 GOTO(out_dobj, rc = -ENOENT);
1291         }
1292         /* Since we only handle restores here, unconditionally use
1293          * SWAP_LAYOUTS_MDS_HSM flag to ensure original layout will
1294          * be preserved in case of failure during swap_layout and not
1295          * leave a file in an intermediate but incoherent state.
1296          * But need to setup HSM xattr of data FID before, reuse
1297          * mti and mh presets for FID in hsm_cdt_request_completed(),
1298          * only need to clear RELEASED and DIRTY.
1299          */
1300         mh_common->mh_flags &= ~(HS_RELEASED | HS_DIRTY);
1301         rc = mdt_hsm_attr_set(mti, dobj, mh_common);
1302         if (rc == 0)
1303                 rc = mo_swap_layouts(mti->mti_env,
1304                                      mdt_object_child(obj),
1305                                      mdt_object_child(dobj),
1306                                      SWAP_LAYOUTS_MDS_HSM);
1307
1308 out_dobj:
1309         mdt_object_unlock_put(mti, dobj, dlh, 1);
1310 out:
1311         RETURN(rc);
1312 }
1313
1314 /**
1315  * update status of a completed request
1316  * \param mti [IN] context
1317  * \param pgs [IN] progress of the copy tool
1318  * \param update_record [IN] update llog record
1319  * \retval 0 success
1320  * \retval -ve failure
1321  */
1322 static int hsm_cdt_request_completed(struct mdt_thread_info *mti,
1323                                      struct hsm_progress_kernel *pgs,
1324                                      const struct cdt_agent_req *car,
1325                                      enum agent_req_status *status)
1326 {
1327         const struct lu_env     *env = mti->mti_env;
1328         struct mdt_device       *mdt = mti->mti_mdt;
1329         struct coordinator      *cdt = &mdt->mdt_coordinator;
1330         struct mdt_object       *obj = NULL;
1331         int                      cl_flags = 0, rc = 0;
1332         struct md_hsm            mh;
1333         bool                     is_mh_changed;
1334         bool                     need_changelog = true;
1335         ENTRY;
1336
1337         /* default is to retry */
1338         *status = ARS_WAITING;
1339
1340         /* find object by FID, mdt_hsm_get_md_hsm() returns obj or err
1341          * if error/removed continue anyway to get correct reporting done */
1342         obj = mdt_hsm_get_md_hsm(mti, &car->car_hai->hai_fid, &mh);
1343         /* we will update MD HSM only if needed */
1344         is_mh_changed = false;
1345
1346         /* no need to change mh->mh_arch_id
1347          * mdt_hsm_get_md_hsm() got it from disk and it is still valid
1348          */
1349         if (pgs->hpk_errval != 0) {
1350                 switch (pgs->hpk_errval) {
1351                 case ENOSYS:
1352                         /* the copy tool does not support cancel
1353                          * so the cancel request is failed
1354                          * As we cannot distinguish a cancel progress
1355                          * from another action progress (they have the
1356                          * same cookie), we suppose here the CT returns
1357                          * ENOSYS only if does not support cancel
1358                          */
1359                         /* this can also happen when cdt calls it to
1360                          * for a timed out request */
1361                         *status = ARS_FAILED;
1362                         /* to have a cancel event in changelog */
1363                         pgs->hpk_errval = ECANCELED;
1364                         break;
1365                 case ECANCELED:
1366                         /* the request record has already been set to
1367                          * ARS_CANCELED, this set the cancel request
1368                          * to ARS_SUCCEED */
1369                         *status = ARS_SUCCEED;
1370                         break;
1371                 default:
1372                         /* retry only if current policy or requested, and
1373                          * object is not on error/removed */
1374                         *status = (cdt->cdt_policy & CDT_NORETRY_ACTION ||
1375                                    !(pgs->hpk_flags & HP_FLAG_RETRY) ||
1376                                    IS_ERR(obj)) ? ARS_FAILED : ARS_WAITING;
1377                         break;
1378                 }
1379
1380                 if (pgs->hpk_errval > CLF_HSM_MAXERROR) {
1381                         CERROR("%s: Request %#llx on "DFID
1382                                " failed, error code %d too large\n",
1383                                mdt_obd_name(mdt),
1384                                pgs->hpk_cookie, PFID(&pgs->hpk_fid),
1385                                pgs->hpk_errval);
1386                         hsm_set_cl_error(&cl_flags,
1387                                          CLF_HSM_ERROVERFLOW);
1388                         rc = -EINVAL;
1389                 } else {
1390                         hsm_set_cl_error(&cl_flags, pgs->hpk_errval);
1391                 }
1392
1393                 switch (car->car_hai->hai_action) {
1394                 case HSMA_ARCHIVE:
1395                         hsm_set_cl_event(&cl_flags, HE_ARCHIVE);
1396                         break;
1397                 case HSMA_RESTORE:
1398                         hsm_set_cl_event(&cl_flags, HE_RESTORE);
1399                         break;
1400                 case HSMA_REMOVE:
1401                         hsm_set_cl_event(&cl_flags, HE_REMOVE);
1402                         break;
1403                 case HSMA_CANCEL:
1404                         hsm_set_cl_event(&cl_flags, HE_CANCEL);
1405                         CERROR("%s: Failed request %#llx on "DFID
1406                                " cannot be a CANCEL\n",
1407                                mdt_obd_name(mdt),
1408                                pgs->hpk_cookie,
1409                                PFID(&pgs->hpk_fid));
1410                         break;
1411                 default:
1412                         CERROR("%s: Failed request %#llx on "DFID
1413                                " %d is an unknown action\n",
1414                                mdt_obd_name(mdt),
1415                                pgs->hpk_cookie, PFID(&pgs->hpk_fid),
1416                                car->car_hai->hai_action);
1417                         rc = -EINVAL;
1418                         break;
1419                 }
1420         } else {
1421                 *status = ARS_SUCCEED;
1422                 switch (car->car_hai->hai_action) {
1423                 case HSMA_ARCHIVE:
1424                         hsm_set_cl_event(&cl_flags, HE_ARCHIVE);
1425                         /* set ARCHIVE keep EXIST and clear LOST and
1426                          * DIRTY */
1427                         mh.mh_arch_ver = pgs->hpk_data_version;
1428                         mh.mh_flags |= HS_ARCHIVED;
1429                         mh.mh_flags &= ~(HS_LOST|HS_DIRTY);
1430                         is_mh_changed = true;
1431                         break;
1432                 case HSMA_RESTORE:
1433                         hsm_set_cl_event(&cl_flags, HE_RESTORE);
1434
1435                         /* do not clear RELEASED and DIRTY here
1436                          * this will occur in hsm_swap_layouts()
1437                          */
1438
1439                         /* Restoring has changed the file version on
1440                          * disk. */
1441                         mh.mh_arch_ver = pgs->hpk_data_version;
1442                         is_mh_changed = true;
1443                         break;
1444                 case HSMA_REMOVE:
1445                         hsm_set_cl_event(&cl_flags, HE_REMOVE);
1446                         /* clear ARCHIVED EXISTS and LOST */
1447                         mh.mh_flags &= ~(HS_ARCHIVED | HS_EXISTS | HS_LOST);
1448                         is_mh_changed = true;
1449                         break;
1450                 case HSMA_CANCEL:
1451                         hsm_set_cl_event(&cl_flags, HE_CANCEL);
1452                         CERROR("%s: Successful request %#llx on "DFID" cannot be a CANCEL\n",
1453                                mdt_obd_name(mdt),
1454                                pgs->hpk_cookie,
1455                                PFID(&pgs->hpk_fid));
1456                         break;
1457                 default:
1458                         CERROR("%s: Successful request %#llx on "DFID" %d is an unknown action\n",
1459                                mdt_obd_name(mdt),
1460                                pgs->hpk_cookie, PFID(&pgs->hpk_fid),
1461                                car->car_hai->hai_action);
1462                         rc = -EINVAL;
1463                         break;
1464                 }
1465         }
1466
1467         /* rc != 0 means error when analysing action, it may come from
1468          * a crasy CT no need to manage DIRTY
1469          * and if mdt_hsm_get_md_hsm() has returned an error, mh has not been
1470          * filled
1471          */
1472         if (rc == 0 && !IS_ERR(obj))
1473                 hsm_set_cl_flags(&cl_flags,
1474                                  mh.mh_flags & HS_DIRTY ? CLF_HSM_DIRTY : 0);
1475
1476         /* unlock is done later, after layout lock management */
1477         if (is_mh_changed && !IS_ERR(obj))
1478                 rc = mdt_hsm_attr_set(mti, obj, &mh);
1479
1480         /* we give back layout lock only if restore was successful or
1481          * if no retry will be attempted and if object is still alive,
1482          * in other cases we just unlock the object */
1483         if (car->car_hai->hai_action == HSMA_RESTORE) {
1484                 struct cdt_restore_handle       *crh;
1485
1486                 /* restore in data FID done, we swap the layouts
1487                  * only if restore is successful */
1488                 if (pgs->hpk_errval == 0 && !IS_ERR(obj)) {
1489                         rc = hsm_swap_layouts(mti, obj, &car->car_hai->hai_dfid,
1490                                               &mh);
1491                         if (rc) {
1492                                 if (cdt->cdt_policy & CDT_NORETRY_ACTION)
1493                                         *status = ARS_FAILED;
1494                                 pgs->hpk_errval = -rc;
1495                         }
1496                 }
1497                 /* we have to retry, so keep layout lock */
1498                 if (*status == ARS_WAITING)
1499                         GOTO(out, rc);
1500
1501                 /* restore special case, need to create ChangeLog record
1502                  * before to give back layout lock to avoid concurrent
1503                  * file updater to post out of order ChangeLog */
1504                 mo_changelog(env, CL_HSM, cl_flags, mdt->mdt_child,
1505                              &car->car_hai->hai_fid);
1506                 need_changelog = false;
1507
1508                 /* give back layout lock */
1509                 mutex_lock(&cdt->cdt_restore_lock);
1510                 crh = mdt_hsm_restore_hdl_find(cdt, &car->car_hai->hai_fid);
1511                 if (crh != NULL)
1512                         list_del(&crh->crh_list);
1513                 mutex_unlock(&cdt->cdt_restore_lock);
1514                 /* Just give back layout lock, we keep the reference
1515                  * which is given back later with the lock for HSM
1516                  * flags.
1517                  * XXX obj may be invalid so we do not pass it. */
1518                 if (crh != NULL)
1519                         mdt_object_unlock(mti, NULL, &crh->crh_lh, 1);
1520
1521                 if (crh != NULL)
1522                         OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
1523         }
1524
1525         GOTO(out, rc);
1526
1527 out:
1528         /* always add a ChangeLog record */
1529         if (need_changelog)
1530                 mo_changelog(env, CL_HSM, cl_flags, mdt->mdt_child,
1531                              &car->car_hai->hai_fid);
1532
1533         if (!IS_ERR(obj))
1534                 mdt_object_put(mti->mti_env, obj);
1535
1536         RETURN(rc);
1537 }
1538
1539 /**
1540  * update status of a request
1541  * \param mti [IN] context
1542  * \param pgs [IN] progress of the copy tool
1543  * \param update_record [IN] update llog record
1544  * \retval 0 success
1545  * \retval -ve failure
1546  */
1547 int mdt_hsm_update_request_state(struct mdt_thread_info *mti,
1548                                  struct hsm_progress_kernel *pgs,
1549                                  const int update_record)
1550 {
1551         struct mdt_device       *mdt = mti->mti_mdt;
1552         struct coordinator      *cdt = &mdt->mdt_coordinator;
1553         struct cdt_agent_req    *car;
1554         int                      rc = 0;
1555         ENTRY;
1556
1557         /* no coordinator started, so we cannot serve requests */
1558         if (cdt->cdt_state == CDT_STOPPED)
1559                 RETURN(-EAGAIN);
1560
1561         /* first do sanity checks */
1562         car = mdt_cdt_update_request(cdt, pgs);
1563         if (IS_ERR(car)) {
1564                 CERROR("%s: Cannot find running request for cookie %#llx"
1565                        " on fid="DFID"\n",
1566                        mdt_obd_name(mdt),
1567                        pgs->hpk_cookie, PFID(&pgs->hpk_fid));
1568
1569                 RETURN(PTR_ERR(car));
1570         }
1571
1572         /* wait for update request from copytool if hsm cancel is
1573          * initiated by user. If update request is as a result of
1574          * kill of copytool or eviction data mover node then skip
1575          * delaying and update the request as copytool is unavailable
1576          * to send the update request
1577          */
1578         if (car->car_delay_update == 1) {
1579                 if (pgs->hpk_flags & HP_FLAG_COMPLETE_DELAYED)
1580                         car->car_delay_update = 0;
1581                 else
1582                         GOTO(out, rc = -ECANCELED);
1583         }
1584
1585         CDEBUG(D_HSM, "Progress received for fid="DFID" cookie=%#llx"
1586                       " action=%s flags=%d err=%d fid="DFID" dfid="DFID"\n",
1587                       PFID(&pgs->hpk_fid), pgs->hpk_cookie,
1588                       hsm_copytool_action2name(car->car_hai->hai_action),
1589                       pgs->hpk_flags, pgs->hpk_errval,
1590                       PFID(&car->car_hai->hai_fid),
1591                       PFID(&car->car_hai->hai_dfid));
1592
1593         /* progress is done on FID or data FID depending of the action and
1594          * of the copy progress */
1595         /* for restore progress is used to send back the data FID to cdt */
1596         if (car->car_hai->hai_action == HSMA_RESTORE &&
1597             lu_fid_eq(&car->car_hai->hai_fid, &car->car_hai->hai_dfid))
1598                 car->car_hai->hai_dfid = pgs->hpk_fid;
1599
1600         if ((car->car_hai->hai_action == HSMA_RESTORE ||
1601              car->car_hai->hai_action == HSMA_ARCHIVE) &&
1602             (!lu_fid_eq(&pgs->hpk_fid, &car->car_hai->hai_dfid) &&
1603              !lu_fid_eq(&pgs->hpk_fid, &car->car_hai->hai_fid))) {
1604                 CERROR("%s: Progress on "DFID" for cookie %#llx"
1605                        " does not match request FID "DFID" nor data FID "
1606                        DFID"\n",
1607                        mdt_obd_name(mdt),
1608                        PFID(&pgs->hpk_fid), pgs->hpk_cookie,
1609                        PFID(&car->car_hai->hai_fid),
1610                        PFID(&car->car_hai->hai_dfid));
1611                 GOTO(out, rc = -EINVAL);
1612         }
1613
1614         if (pgs->hpk_errval != 0 && !(pgs->hpk_flags & HP_FLAG_COMPLETED)) {
1615                 CERROR("%s: Progress on "DFID" for cookie %#llx action=%s"
1616                        " is not coherent (err=%d and not completed"
1617                        " (flags=%d))\n",
1618                        mdt_obd_name(mdt),
1619                        PFID(&pgs->hpk_fid), pgs->hpk_cookie,
1620                        hsm_copytool_action2name(car->car_hai->hai_action),
1621                        pgs->hpk_errval, pgs->hpk_flags);
1622                 GOTO(out, rc = -EINVAL);
1623         }
1624
1625         /* now progress is valid */
1626
1627         /* we use a root like ucred */
1628         hsm_init_ucred(mdt_ucred(mti));
1629
1630         if (pgs->hpk_flags & HP_FLAG_COMPLETED) {
1631                 enum agent_req_status    status;
1632
1633                 rc = hsm_cdt_request_completed(mti, pgs, car, &status);
1634
1635                 CDEBUG(D_HSM, "%s record: fid="DFID" cookie=%#llx action=%s "
1636                               "status=%s\n",
1637                        update_record ? "Updating" : "Not updating",
1638                        PFID(&pgs->hpk_fid), pgs->hpk_cookie,
1639                        hsm_copytool_action2name(car->car_hai->hai_action),
1640                        agent_req_status2name(status));
1641
1642                 /* update record first (LU-9075) */
1643                 if (update_record) {
1644                         int rc1;
1645                         struct hsm_record_update update = {
1646                                 .cookie = pgs->hpk_cookie,
1647                                 .status = status,
1648                         };
1649
1650                         rc1 = mdt_agent_record_update(mti->mti_env, mdt,
1651                                                       &update, 1);
1652                         if (rc1)
1653                                 CERROR("%s: mdt_agent_record_update() failed,"
1654                                        " rc=%d, cannot update status to %s"
1655                                        " for cookie %#llx\n",
1656                                        mdt_obd_name(mdt), rc1,
1657                                        agent_req_status2name(status),
1658                                        pgs->hpk_cookie);
1659                         rc = (rc != 0 ? rc : rc1);
1660                 }
1661
1662                 /* then remove request from memory list (LU-9075) */
1663                 mdt_cdt_remove_request(cdt, pgs->hpk_cookie);
1664
1665                 /* ct has completed a request, so a slot is available,
1666                  * signal the coordinator to find new work */
1667                 mdt_hsm_cdt_event(cdt);
1668         } else {
1669                 /* if copytool send a progress on a canceled request
1670                  * we inform copytool it should stop
1671                  */
1672                 if (car->car_canceled == 1)
1673                         rc = -ECANCELED;
1674         }
1675         GOTO(out, rc);
1676
1677 out:
1678         /* remove ref got from mdt_cdt_update_request() */
1679         mdt_cdt_put_request(car);
1680
1681         return rc;
1682 }
1683
1684
1685 /**
1686  * data passed to llog_cat_process() callback
1687  * to cancel requests
1688  */
1689 struct hsm_cancel_all_data {
1690         struct mdt_device       *mdt;
1691 };
1692
1693 /**
1694  *  llog_cat_process() callback, used to:
1695  *  - purge all requests
1696  * \param env [IN] environment
1697  * \param llh [IN] llog handle
1698  * \param hdr [IN] llog record
1699  * \param data [IN] cb data = struct hsm_cancel_all_data
1700  * \retval 0 success
1701  * \retval -ve failure
1702  */
1703 static int mdt_cancel_all_cb(const struct lu_env *env,
1704                              struct llog_handle *llh,
1705                              struct llog_rec_hdr *hdr, void *data)
1706 {
1707         struct llog_agent_req_rec       *larr;
1708         struct hsm_cancel_all_data      *hcad;
1709         int                              rc = 0;
1710         ENTRY;
1711
1712         larr = (struct llog_agent_req_rec *)hdr;
1713         hcad = data;
1714         if (larr->arr_status == ARS_WAITING ||
1715             larr->arr_status == ARS_STARTED) {
1716                 larr->arr_status = ARS_CANCELED;
1717                 larr->arr_req_change = ktime_get_real_seconds();
1718                 rc = llog_write(env, llh, hdr, hdr->lrh_index);
1719         }
1720
1721         RETURN(rc);
1722 }
1723
1724 /**
1725  * prepare cancel request
1726  * \param hal [IN] pointer to allocate memory
1727  * \param car [IN] coordinator agent request
1728  * \param mdt_obd_name [IN] mdt object device name
1729  * \param hal_sz [IN, OUT] old size of hal_sz buffer
1730  */
1731 static struct hsm_action_list *
1732 hsm_create_cancel_request(struct hsm_action_list *hal,
1733                           struct cdt_agent_req *car,
1734                           char *mdt_obd_name, int *hal_sz)
1735 {
1736         struct hsm_action_item *hai;
1737         int hal_sz_needed;
1738
1739         /* needed size */
1740         hal_sz_needed = sizeof(*hal) + cfs_size_round(MTI_NAME_MAXLEN + 1) +
1741                   cfs_size_round(car->car_hai->hai_len);
1742
1743         if (hal_sz_needed > *hal_sz) {
1744                 /* not enough room, free old buffer */
1745                 if (hal != NULL)
1746                         OBD_FREE(hal, *hal_sz);
1747                 *hal_sz = hal_sz_needed;
1748                 OBD_ALLOC(hal, *hal_sz);
1749                 if (hal == NULL) {
1750                         CERROR("Cannot allocate memory for hal\n");
1751                         RETURN(NULL);
1752                 }
1753         }
1754
1755         hal->hal_version = HAL_VERSION;
1756         obd_uuid2fsname(hal->hal_fsname, mdt_obd_name,
1757                         MTI_NAME_MAXLEN);
1758         hal->hal_fsname[MTI_NAME_MAXLEN] = '\0';
1759         hal->hal_compound_id = car->car_compound_id;
1760         hal->hal_archive_id = car->car_archive_id;
1761         hal->hal_flags = car->car_flags;
1762         hal->hal_count = 0;
1763
1764         hai = hai_first(hal);
1765         memcpy(hai, car->car_hai, car->car_hai->hai_len);
1766         hai->hai_action = HSMA_CANCEL;
1767         hal->hal_count = 1;
1768
1769         RETURN(hal);
1770 }
1771
1772 /**
1773  * cancel actions running on a agent
1774  * \param mdt [IN] MDT device
1775  * \param uuid [IN] the obd_uuid of the agent whose requests are to be canceled
1776  */
1777 int hsm_cancel_agent_requests(struct mdt_device *mdt,
1778                               const struct obd_uuid *uuid)
1779 {
1780         struct mdt_thread_info *mti;
1781         struct coordinator *cdt = &mdt->mdt_coordinator;
1782         struct cdt_agent_req *car;
1783         int rc = 0;
1784         enum cdt_states save_state;
1785         struct hsm_record_update update;
1786         ENTRY;
1787
1788         /* retrieve coordinator context */
1789         mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
1790
1791         /* disable coordinator */
1792         save_state = cdt->cdt_state;
1793         cdt->cdt_state = CDT_DISABLE;
1794
1795         down_read(&cdt->cdt_request_lock);
1796         list_for_each_entry(car, &cdt->cdt_request_list, car_request_list) {
1797                 mdt_cdt_get_request(car);
1798                 /* request is not yet removed from list, it will be done
1799                  * when copytool will return progress
1800                  */
1801                 if (!obd_uuid_equals(&car->car_uuid, uuid)) {
1802                         mdt_cdt_put_request(car);
1803                         continue;
1804                 }
1805
1806                 if (car->car_hai->hai_action == HSMA_CANCEL) {
1807                         mdt_cdt_put_request(car);
1808                         continue;
1809                 }
1810
1811                 update.cookie = car->car_hai->hai_cookie;
1812                 update.status = ARS_CANCELED;
1813
1814                 rc = mdt_agent_record_update(mti->mti_env, mti->mti_mdt,
1815                                              &update, 1);
1816
1817                 if (rc == 0)
1818                         car->car_canceled = 1;
1819                 else
1820                         CERROR("%s: mdt_agent_record_update() failed, "
1821                                "rc=%d, cannot update status to %s "
1822                                "for cookie %#llx\n",
1823                                mdt_obd_name(mdt), rc,
1824                                agent_req_status2name(ARS_CANCELED),
1825                                car->car_hai->hai_cookie);
1826
1827                 mdt_cdt_put_request(car);
1828         }
1829         up_read(&cdt->cdt_request_lock);
1830
1831         /* enable coordinator */
1832         cdt->cdt_state = save_state;
1833
1834         RETURN(rc);
1835 }
1836
1837 /**
1838  * cancel all actions
1839  * \param obd [IN] MDT device
1840  */
1841 static int hsm_cancel_all_actions(struct mdt_device *mdt)
1842 {
1843         struct lu_env                    env;
1844         struct lu_context                session;
1845         struct mdt_thread_info          *mti;
1846         struct coordinator              *cdt = &mdt->mdt_coordinator;
1847         struct cdt_agent_req            *car;
1848         struct hsm_action_list          *hal = NULL;
1849         struct hsm_cancel_all_data       hcad;
1850         int                              hal_sz = 0, rc;
1851         enum cdt_states                  old_state;
1852         ENTRY;
1853
1854         rc = lu_env_init(&env, LCT_MD_THREAD);
1855         if (rc < 0)
1856                 RETURN(rc);
1857
1858         /* for mdt_ucred(), lu_ucred stored in lu_ucred_key */
1859         rc = lu_context_init(&session, LCT_SERVER_SESSION);
1860         if (rc < 0)
1861                 GOTO(out_env, rc);
1862
1863         lu_context_enter(&session);
1864         env.le_ses = &session;
1865
1866         mti = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
1867         LASSERT(mti != NULL);
1868
1869         mti->mti_env = &env;
1870         mti->mti_mdt = mdt;
1871
1872         hsm_init_ucred(mdt_ucred(mti));
1873
1874         /* disable coordinator */
1875         rc = set_cdt_state(cdt, CDT_DISABLE, &old_state);
1876         if (rc)
1877                 RETURN(rc);
1878
1879         /* send cancel to all running requests */
1880         down_read(&cdt->cdt_request_lock);
1881         list_for_each_entry(car, &cdt->cdt_request_list, car_request_list) {
1882                 mdt_cdt_get_request(car);
1883                 /* request is not yet removed from list, it will be done
1884                  * when copytool will return progress
1885                  */
1886
1887                 if (car->car_hai->hai_action == HSMA_CANCEL) {
1888                         mdt_cdt_put_request(car);
1889                         continue;
1890                 }
1891
1892                 hal = hsm_create_cancel_request(hal, car,
1893                                                 mdt_obd_name(mdt),
1894                                                 &hal_sz);
1895                 if (hal == NULL) {
1896                         mdt_cdt_put_request(car);
1897                         up_read(&cdt->cdt_request_lock);
1898                         GOTO(out_cdt_state, rc = -ENOMEM);
1899                 }
1900
1901                 /* it is possible to safely call mdt_hsm_agent_send()
1902                  * (ie without a deadlock on cdt_request_lock), because the
1903                  * write lock is taken only if we are not in purge mode
1904                  * (mdt_hsm_agent_send() does not call mdt_cdt_add_request()
1905                  *   nor mdt_cdt_remove_request())
1906                  */
1907                 /* no conflict with cdt thread because cdt is disable and we
1908                  * have the request lock */
1909                 mdt_hsm_agent_send(mti, hal, true);
1910
1911                 mdt_cdt_put_request(car);
1912         }
1913         up_read(&cdt->cdt_request_lock);
1914
1915         if (hal != NULL)
1916                 OBD_FREE(hal, hal_sz);
1917
1918         /* cancel all on-disk records */
1919         hcad.mdt = mdt;
1920
1921         rc = cdt_llog_process(mti->mti_env, mti->mti_mdt, mdt_cancel_all_cb,
1922                               &hcad, 0, 0, WRITE);
1923 out_cdt_state:
1924         /* Enable coordinator, unless the coordinator was stopping. */
1925         set_cdt_state(cdt, old_state, NULL);
1926         lu_context_exit(&session);
1927         lu_context_fini(&session);
1928 out_env:
1929         lu_env_fini(&env);
1930
1931         RETURN(rc);
1932 }
1933
1934 /**
1935  * check if a request is compatible with file status
1936  * \param hai [IN] request description
1937  * \param archive_id [IN] request archive id
1938  * \param rq_flags [IN] request flags
1939  * \param hsm [IN] file HSM metadata
1940  * \retval boolean
1941  */
1942 bool mdt_hsm_is_action_compat(const struct hsm_action_item *hai,
1943                               u32 archive_id, u64 rq_flags,
1944                               const struct md_hsm *hsm)
1945 {
1946         int      is_compat = false;
1947         int      hsm_flags;
1948         ENTRY;
1949
1950         hsm_flags = hsm->mh_flags;
1951         switch (hai->hai_action) {
1952         case HSMA_ARCHIVE:
1953                 if (!(hsm_flags & HS_NOARCHIVE) &&
1954                     (hsm_flags & HS_DIRTY || !(hsm_flags & HS_ARCHIVED)))
1955                         is_compat = true;
1956
1957                 if (hsm_flags & HS_EXISTS &&
1958                     archive_id != 0 &&
1959                     archive_id != hsm->mh_arch_id)
1960                         is_compat = false;
1961
1962                 break;
1963         case HSMA_RESTORE:
1964                 if (!(hsm_flags & HS_DIRTY) && (hsm_flags & HS_RELEASED) &&
1965                     hsm_flags & HS_ARCHIVED && !(hsm_flags & HS_LOST))
1966                         is_compat = true;
1967                 break;
1968         case HSMA_REMOVE:
1969                 if (!(hsm_flags & HS_RELEASED) &&
1970                     (hsm_flags & (HS_ARCHIVED | HS_EXISTS)))
1971                         is_compat = true;
1972                 break;
1973         case HSMA_CANCEL:
1974                 is_compat = true;
1975                 break;
1976         }
1977         CDEBUG(D_HSM, "fid="DFID" action=%s flags=%#llx"
1978                       " extent=%#llx-%#llx hsm_flags=%.8X %s\n",
1979                       PFID(&hai->hai_fid),
1980                       hsm_copytool_action2name(hai->hai_action), rq_flags,
1981                       hai->hai_extent.offset, hai->hai_extent.length,
1982                       hsm->mh_flags,
1983                       (is_compat ? "compatible" : "uncompatible"));
1984
1985         RETURN(is_compat);
1986 }
1987
1988 /*
1989  * /proc interface used to get/set HSM behaviour (cdt->cdt_policy)
1990  */
1991 static const struct {
1992         __u64            bit;
1993         char            *name;
1994         char            *nickname;
1995 } hsm_policy_names[] = {
1996         { CDT_NONBLOCKING_RESTORE,      "NonBlockingRestore",   "NBR"},
1997         { CDT_NORETRY_ACTION,           "NoRetryAction",        "NRA"},
1998         { 0 },
1999 };
2000
2001 /**
2002  * convert a policy name to a bit
2003  * \param name [IN] policy name
2004  * \retval 0 unknown
2005  * \retval   policy bit
2006  */
2007 static __u64 hsm_policy_str2bit(const char *name)
2008 {
2009         int      i;
2010
2011         for (i = 0; hsm_policy_names[i].bit != 0; i++)
2012                 if (strcmp(hsm_policy_names[i].nickname, name) == 0 ||
2013                     strcmp(hsm_policy_names[i].name, name) == 0)
2014                         return hsm_policy_names[i].bit;
2015         return 0;
2016 }
2017
2018 /**
2019  * convert a policy bit field to a string
2020  * \param mask [IN] policy bit field
2021  * \param hexa [IN] print mask before bit names
2022  * \param buffer [OUT] string
2023  * \param count [IN] size of buffer
2024  */
2025 static void hsm_policy_bit2str(struct seq_file *m, const __u64 mask,
2026                                 const bool hexa)
2027 {
2028         int      i, j;
2029         __u64    bit;
2030         ENTRY;
2031
2032         if (hexa)
2033                 seq_printf(m, "(%#llx) ", mask);
2034
2035         for (i = 0; i < CDT_POLICY_SHIFT_COUNT; i++) {
2036                 bit = (1ULL << i);
2037
2038                 for (j = 0; hsm_policy_names[j].bit != 0; j++) {
2039                         if (hsm_policy_names[j].bit == bit)
2040                                 break;
2041                 }
2042                 if (bit & mask)
2043                         seq_printf(m, "[%s] ", hsm_policy_names[j].name);
2044                 else
2045                         seq_printf(m, "%s ", hsm_policy_names[j].name);
2046         }
2047         /* remove last ' ' */
2048         m->count--;
2049         seq_putc(m, '\n');
2050 }
2051
2052 /* methods to read/write HSM policy flags */
2053 static int mdt_hsm_policy_seq_show(struct seq_file *m, void *data)
2054 {
2055         struct mdt_device       *mdt = m->private;
2056         struct coordinator      *cdt = &mdt->mdt_coordinator;
2057         ENTRY;
2058
2059         hsm_policy_bit2str(m, cdt->cdt_policy, false);
2060         RETURN(0);
2061 }
2062
2063 static ssize_t
2064 mdt_hsm_policy_seq_write(struct file *file, const char __user *buffer,
2065                          size_t count, loff_t *off)
2066 {
2067         struct seq_file         *m = file->private_data;
2068         struct mdt_device       *mdt = m->private;
2069         struct coordinator      *cdt = &mdt->mdt_coordinator;
2070         char                    *start, *token, sign;
2071         char                    *buf;
2072         __u64                    policy;
2073         __u64                    add_mask, remove_mask, set_mask;
2074         int                      rc;
2075         ENTRY;
2076
2077         if (count + 1 > PAGE_SIZE)
2078                 RETURN(-EINVAL);
2079
2080         OBD_ALLOC(buf, count + 1);
2081         if (buf == NULL)
2082                 RETURN(-ENOMEM);
2083
2084         if (copy_from_user(buf, buffer, count))
2085                 GOTO(out, rc = -EFAULT);
2086
2087         buf[count] = '\0';
2088
2089         start = buf;
2090         CDEBUG(D_HSM, "%s: receive new policy: '%s'\n", mdt_obd_name(mdt),
2091                start);
2092
2093         add_mask = remove_mask = set_mask = 0;
2094         do {
2095                 token = strsep(&start, "\n ");
2096                 sign = *token;
2097
2098                 if (sign == '\0')
2099                         continue;
2100
2101                 if (sign == '-' || sign == '+')
2102                         token++;
2103
2104                 policy = hsm_policy_str2bit(token);
2105                 if (policy == 0) {
2106                         CWARN("%s: '%s' is unknown, "
2107                               "supported policies are:\n", mdt_obd_name(mdt),
2108                               token);
2109                         hsm_policy_bit2str(m, 0, false);
2110                         GOTO(out, rc = -EINVAL);
2111                 }
2112                 switch (sign) {
2113                 case '-':
2114                         remove_mask |= policy;
2115                         break;
2116                 case '+':
2117                         add_mask |= policy;
2118                         break;
2119                 default:
2120                         set_mask |= policy;
2121                         break;
2122                 }
2123
2124         } while (start != NULL);
2125
2126         CDEBUG(D_HSM, "%s: new policy: rm=%#llx add=%#llx set=%#llx\n",
2127                mdt_obd_name(mdt), remove_mask, add_mask, set_mask);
2128
2129         /* if no sign in all string, it is a clear and set
2130          * if some sign found, all unsigned are converted
2131          * to add
2132          * P1 P2 = set to P1 and P2
2133          * P1 -P2 = add P1 clear P2 same as +P1 -P2
2134          */
2135         if (remove_mask == 0 && add_mask == 0) {
2136                 cdt->cdt_policy = set_mask;
2137         } else {
2138                 cdt->cdt_policy |= set_mask | add_mask;
2139                 cdt->cdt_policy &= ~remove_mask;
2140         }
2141
2142         GOTO(out, rc = count);
2143
2144 out:
2145         OBD_FREE(buf, count + 1);
2146         RETURN(rc);
2147 }
2148 LPROC_SEQ_FOPS(mdt_hsm_policy);
2149
2150 #define GENERATE_PROC_METHOD(VAR)                                       \
2151 static int mdt_hsm_##VAR##_seq_show(struct seq_file *m, void *data)     \
2152 {                                                                       \
2153         struct mdt_device       *mdt = m->private;                      \
2154         struct coordinator      *cdt = &mdt->mdt_coordinator;           \
2155         ENTRY;                                                          \
2156                                                                         \
2157         seq_printf(m, "%llu\n", (__u64)cdt->VAR);                       \
2158         RETURN(0);                                                      \
2159 }                                                                       \
2160 static ssize_t                                                          \
2161 mdt_hsm_##VAR##_seq_write(struct file *file, const char __user *buffer, \
2162                           size_t count, loff_t *off)                    \
2163                                                                         \
2164 {                                                                       \
2165         struct seq_file         *m = file->private_data;                \
2166         struct mdt_device       *mdt = m->private;                      \
2167         struct coordinator      *cdt = &mdt->mdt_coordinator;           \
2168         __s64                    val;                                   \
2169         int                      rc;                                    \
2170         ENTRY;                                                          \
2171                                                                         \
2172         rc = lprocfs_str_to_s64(buffer, count, &val);                   \
2173         if (rc)                                                         \
2174                 RETURN(rc);                                             \
2175         if (val > 0 && val < INT_MAX) {                                 \
2176                 cdt->VAR = val;                                         \
2177                 RETURN(count);                                          \
2178         }                                                               \
2179         RETURN(-EINVAL);                                                \
2180 }                                                                       \
2181
2182 GENERATE_PROC_METHOD(cdt_loop_period)
2183 GENERATE_PROC_METHOD(cdt_grace_delay)
2184 GENERATE_PROC_METHOD(cdt_active_req_timeout)
2185 GENERATE_PROC_METHOD(cdt_max_requests)
2186 GENERATE_PROC_METHOD(cdt_default_archive_id)
2187
2188 /*
2189  * procfs write method for MDT/hsm_control
2190  * proc entry is in mdt directory so data is mdt obd_device pointer
2191  */
2192 #define CDT_ENABLE_CMD   "enabled"
2193 #define CDT_STOP_CMD     "shutdown"
2194 #define CDT_DISABLE_CMD  "disabled"
2195 #define CDT_PURGE_CMD    "purge"
2196 #define CDT_HELP_CMD     "help"
2197 #define CDT_MAX_CMD_LEN  10
2198
2199 ssize_t
2200 mdt_hsm_cdt_control_seq_write(struct file *file, const char __user *buffer,
2201                               size_t count, loff_t *off)
2202 {
2203         struct seq_file         *m = file->private_data;
2204         struct obd_device       *obd = m->private;
2205         struct mdt_device       *mdt = mdt_dev(obd->obd_lu_dev);
2206         struct coordinator      *cdt = &(mdt->mdt_coordinator);
2207         int                      rc, usage = 0;
2208         char                     kernbuf[CDT_MAX_CMD_LEN];
2209         ENTRY;
2210
2211         if (count == 0 || count >= sizeof(kernbuf))
2212                 RETURN(-EINVAL);
2213
2214         if (copy_from_user(kernbuf, buffer, count))
2215                 RETURN(-EFAULT);
2216         kernbuf[count] = 0;
2217
2218         if (kernbuf[count - 1] == '\n')
2219                 kernbuf[count - 1] = 0;
2220
2221         rc = 0;
2222         if (strcmp(kernbuf, CDT_ENABLE_CMD) == 0) {
2223                 if (cdt->cdt_state == CDT_DISABLE) {
2224                         rc = set_cdt_state(cdt, CDT_RUNNING, NULL);
2225                         mdt_hsm_cdt_event(cdt);
2226                         wake_up(&cdt->cdt_waitq);
2227                 } else {
2228                         rc = mdt_hsm_cdt_start(mdt);
2229                 }
2230         } else if (strcmp(kernbuf, CDT_STOP_CMD) == 0) {
2231                 if ((cdt->cdt_state == CDT_STOPPING) ||
2232                     (cdt->cdt_state == CDT_STOPPED)) {
2233                         CERROR("%s: Coordinator already stopped\n",
2234                                mdt_obd_name(mdt));
2235                         rc = -EALREADY;
2236                 } else {
2237                         rc = mdt_hsm_cdt_stop(mdt);
2238                 }
2239         } else if (strcmp(kernbuf, CDT_DISABLE_CMD) == 0) {
2240                 if ((cdt->cdt_state == CDT_STOPPING) ||
2241                     (cdt->cdt_state == CDT_STOPPED)) {
2242                         CERROR("%s: Coordinator is stopped\n",
2243                                mdt_obd_name(mdt));
2244                         rc = -EINVAL;
2245                 } else {
2246                         rc = set_cdt_state(cdt, CDT_DISABLE, NULL);
2247                 }
2248         } else if (strcmp(kernbuf, CDT_PURGE_CMD) == 0) {
2249                 rc = hsm_cancel_all_actions(mdt);
2250         } else if (strcmp(kernbuf, CDT_HELP_CMD) == 0) {
2251                 usage = 1;
2252         } else {
2253                 usage = 1;
2254                 rc = -EINVAL;
2255         }
2256
2257         if (usage == 1)
2258                 CERROR("%s: Valid coordinator control commands are: "
2259                        "%s %s %s %s %s\n", mdt_obd_name(mdt),
2260                        CDT_ENABLE_CMD, CDT_STOP_CMD, CDT_DISABLE_CMD,
2261                        CDT_PURGE_CMD, CDT_HELP_CMD);
2262
2263         if (rc)
2264                 RETURN(rc);
2265
2266         RETURN(count);
2267 }
2268
2269 int mdt_hsm_cdt_control_seq_show(struct seq_file *m, void *data)
2270 {
2271         struct obd_device       *obd = m->private;
2272         struct coordinator      *cdt;
2273         ENTRY;
2274
2275         cdt = &(mdt_dev(obd->obd_lu_dev)->mdt_coordinator);
2276
2277         seq_printf(m, "%s\n", cdt_mdt_state2str(cdt->cdt_state));
2278
2279         RETURN(0);
2280 }
2281
2282 static int
2283 mdt_hsm_request_mask_show(struct seq_file *m, __u64 mask)
2284 {
2285         bool first = true;
2286         int i;
2287         ENTRY;
2288
2289         for (i = 0; i < 8 * sizeof(mask); i++) {
2290                 if (mask & (1UL << i)) {
2291                         seq_printf(m, "%s%s", first ? "" : " ",
2292                                    hsm_copytool_action2name(i));
2293                         first = false;
2294                 }
2295         }
2296         seq_putc(m, '\n');
2297
2298         RETURN(0);
2299 }
2300
2301 static int
2302 mdt_hsm_user_request_mask_seq_show(struct seq_file *m, void *data)
2303 {
2304         struct mdt_device *mdt = m->private;
2305         struct coordinator *cdt = &mdt->mdt_coordinator;
2306
2307         return mdt_hsm_request_mask_show(m, cdt->cdt_user_request_mask);
2308 }
2309
2310 static int
2311 mdt_hsm_group_request_mask_seq_show(struct seq_file *m, void *data)
2312 {
2313         struct mdt_device *mdt = m->private;
2314         struct coordinator *cdt = &mdt->mdt_coordinator;
2315
2316         return mdt_hsm_request_mask_show(m, cdt->cdt_group_request_mask);
2317 }
2318
2319 static int
2320 mdt_hsm_other_request_mask_seq_show(struct seq_file *m, void *data)
2321 {
2322         struct mdt_device *mdt = m->private;
2323         struct coordinator *cdt = &mdt->mdt_coordinator;
2324
2325         return mdt_hsm_request_mask_show(m, cdt->cdt_other_request_mask);
2326 }
2327
2328 static inline enum hsm_copytool_action
2329 hsm_copytool_name2action(const char *name)
2330 {
2331         if (strcasecmp(name, "NOOP") == 0)
2332                 return HSMA_NONE;
2333         else if (strcasecmp(name, "ARCHIVE") == 0)
2334                 return HSMA_ARCHIVE;
2335         else if (strcasecmp(name, "RESTORE") == 0)
2336                 return HSMA_RESTORE;
2337         else if (strcasecmp(name, "REMOVE") == 0)
2338                 return HSMA_REMOVE;
2339         else if (strcasecmp(name, "CANCEL") == 0)
2340                 return HSMA_CANCEL;
2341         else
2342                 return -1;
2343 }
2344
2345 static ssize_t
2346 mdt_write_hsm_request_mask(struct file *file, const char __user *user_buf,
2347                             size_t user_count, __u64 *mask)
2348 {
2349         char *buf, *pos, *name;
2350         size_t buf_size;
2351         __u64 new_mask = 0;
2352         int rc;
2353         ENTRY;
2354
2355         if (!(user_count < 4096))
2356                 RETURN(-ENOMEM);
2357
2358         buf_size = user_count + 1;
2359
2360         OBD_ALLOC(buf, buf_size);
2361         if (buf == NULL)
2362                 RETURN(-ENOMEM);
2363
2364         if (copy_from_user(buf, user_buf, buf_size - 1))
2365                 GOTO(out, rc = -EFAULT);
2366
2367         buf[buf_size - 1] = '\0';
2368
2369         pos = buf;
2370         while ((name = strsep(&pos, " \t\v\n")) != NULL) {
2371                 int action;
2372
2373                 if (*name == '\0')
2374                         continue;
2375
2376                 action = hsm_copytool_name2action(name);
2377                 if (action < 0)
2378                         GOTO(out, rc = -EINVAL);
2379
2380                 new_mask |= (1UL << action);
2381         }
2382
2383         *mask = new_mask;
2384         rc = user_count;
2385 out:
2386         OBD_FREE(buf, buf_size);
2387
2388         RETURN(rc);
2389 }
2390
2391 static ssize_t
2392 mdt_hsm_user_request_mask_seq_write(struct file *file, const char __user *buf,
2393                                         size_t count, loff_t *off)
2394 {
2395         struct seq_file         *m = file->private_data;
2396         struct mdt_device       *mdt = m->private;
2397         struct coordinator *cdt = &mdt->mdt_coordinator;
2398
2399         return mdt_write_hsm_request_mask(file, buf, count,
2400                                            &cdt->cdt_user_request_mask);
2401 }
2402
2403 static ssize_t
2404 mdt_hsm_group_request_mask_seq_write(struct file *file, const char __user *buf,
2405                                         size_t count, loff_t *off)
2406 {
2407         struct seq_file         *m = file->private_data;
2408         struct mdt_device       *mdt = m->private;
2409         struct coordinator      *cdt = &mdt->mdt_coordinator;
2410
2411         return mdt_write_hsm_request_mask(file, buf, count,
2412                                            &cdt->cdt_group_request_mask);
2413 }
2414
2415 static ssize_t
2416 mdt_hsm_other_request_mask_seq_write(struct file *file, const char __user *buf,
2417                                         size_t count, loff_t *off)
2418 {
2419         struct seq_file         *m = file->private_data;
2420         struct mdt_device       *mdt = m->private;
2421         struct coordinator      *cdt = &mdt->mdt_coordinator;
2422
2423         return mdt_write_hsm_request_mask(file, buf, count,
2424                                            &cdt->cdt_other_request_mask);
2425 }
2426
2427 static int mdt_hsm_cdt_raolu_seq_show(struct seq_file *m, void *data)
2428 {
2429         struct mdt_device *mdt = m->private;
2430         struct coordinator *cdt = &mdt->mdt_coordinator;
2431         ENTRY;
2432
2433         seq_printf(m, "%d\n", (int)cdt->cdt_remove_archive_on_last_unlink);
2434         RETURN(0);
2435 }
2436
2437 static ssize_t
2438 mdt_hsm_cdt_raolu_seq_write(struct file *file, const char __user *buffer,
2439                             size_t count, loff_t *off)
2440
2441 {
2442         struct seq_file *m = file->private_data;
2443         struct mdt_device *mdt = m->private;
2444         struct coordinator *cdt = &mdt->mdt_coordinator;
2445         __s64 val;
2446         int rc;
2447         ENTRY;
2448
2449         rc = lprocfs_str_to_s64(buffer, count, &val);
2450         if (rc < 0)
2451                 RETURN(rc);
2452
2453         cdt->cdt_remove_archive_on_last_unlink = val;
2454         RETURN(count);
2455 }
2456
2457 LPROC_SEQ_FOPS(mdt_hsm_cdt_loop_period);
2458 LPROC_SEQ_FOPS(mdt_hsm_cdt_grace_delay);
2459 LPROC_SEQ_FOPS(mdt_hsm_cdt_active_req_timeout);
2460 LPROC_SEQ_FOPS(mdt_hsm_cdt_max_requests);
2461 LPROC_SEQ_FOPS(mdt_hsm_cdt_default_archive_id);
2462 LPROC_SEQ_FOPS(mdt_hsm_user_request_mask);
2463 LPROC_SEQ_FOPS(mdt_hsm_group_request_mask);
2464 LPROC_SEQ_FOPS(mdt_hsm_other_request_mask);
2465 LPROC_SEQ_FOPS(mdt_hsm_cdt_raolu);
2466
2467 static struct lprocfs_vars lprocfs_mdt_hsm_vars[] = {
2468         { .name =       "agents",
2469           .fops =       &mdt_hsm_agent_fops                     },
2470         { .name =       "actions",
2471           .fops =       &mdt_hsm_actions_fops,
2472           .proc_mode =  0444                                    },
2473         { .name =       "default_archive_id",
2474           .fops =       &mdt_hsm_cdt_default_archive_id_fops    },
2475         { .name =       "grace_delay",
2476           .fops =       &mdt_hsm_cdt_grace_delay_fops           },
2477         { .name =       "loop_period",
2478           .fops =       &mdt_hsm_cdt_loop_period_fops           },
2479         { .name =       "max_requests",
2480           .fops =       &mdt_hsm_cdt_max_requests_fops          },
2481         { .name =       "policy",
2482           .fops =       &mdt_hsm_policy_fops                    },
2483         { .name =       "active_request_timeout",
2484           .fops =       &mdt_hsm_cdt_active_req_timeout_fops    },
2485         { .name =       "active_requests",
2486           .fops =       &mdt_hsm_active_requests_fops           },
2487         { .name =       "user_request_mask",
2488           .fops =       &mdt_hsm_user_request_mask_fops,        },
2489         { .name =       "group_request_mask",
2490           .fops =       &mdt_hsm_group_request_mask_fops,       },
2491         { .name =       "other_request_mask",
2492           .fops =       &mdt_hsm_other_request_mask_fops,       },
2493         { .name =       "remove_archive_on_last_unlink",
2494           .fops =       &mdt_hsm_cdt_raolu_fops,                },
2495         { 0 }
2496 };