Whamcloud - gitweb
LU-9633 ptlrpc: Add kernel doc style for ptlrpc (14)
[fs/lustre-release.git] / lustre / mdt / mdt_coordinator.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 /*
4  * Copyright (c) 2011, 2012 Commissariat a l'energie atomique et aux energies
5  *                          alternatives
6  *
7  * Copyright (c) 2013, 2017, Intel Corporation.
8  * Use is subject to license terms.
9  */
10
11 /*
12  * Lustre HSM Coordinator
13  *
14  * Author: Jacques-Charles Lafoucriere <jacques-charles.lafoucriere@cea.fr>
15  * Author: Aurelien Degremont <aurelien.degremont@cea.fr>
16  * Author: Thomas Leibovici <thomas.leibovici@cea.fr>
17  */
18
19 #define DEBUG_SUBSYSTEM S_MDS
20
21 #include <linux/kthread.h>
22 #include <linux/kernel.h>
23 #include <obd_support.h>
24 #include <lustre_export.h>
25 #include <obd.h>
26 #include <lprocfs_status.h>
27 #include <lustre_log.h>
28 #include <lustre_kernelcomm.h>
29 #include "mdt_internal.h"
30
31 /**
32  * get obj and HSM attributes on a fid
33  * \param mti [IN] context
34  * \param fid [IN] object fid
35  * \param hsm [OUT] HSM meta data
36  * \retval obj or error (-ENOENT if not found)
37  */
38 struct mdt_object *mdt_hsm_get_md_hsm(struct mdt_thread_info *mti,
39                                       const struct lu_fid *fid,
40                                       struct md_hsm *hsm)
41 {
42         struct md_attr          *ma;
43         struct mdt_object       *obj;
44         int                      rc;
45         ENTRY;
46
47         ma = &mti->mti_attr;
48         ma->ma_need = MA_HSM;
49         ma->ma_valid = 0;
50
51         /* find object by FID */
52         obj = mdt_object_find(mti->mti_env, mti->mti_mdt, fid);
53         if (IS_ERR(obj))
54                 RETURN(obj);
55
56         if (!mdt_object_exists(obj)) {
57                 /* no more object */
58                 mdt_object_put(mti->mti_env, obj);
59                 RETURN(ERR_PTR(-ENOENT));
60         }
61
62         rc = mdt_attr_get_complex(mti, obj, ma);
63         if (rc) {
64                 mdt_object_put(mti->mti_env, obj);
65                 RETURN(ERR_PTR(rc));
66         }
67
68         if (ma->ma_valid & MA_HSM)
69                 *hsm = ma->ma_hsm;
70         else
71                 memset(hsm, 0, sizeof(*hsm));
72         ma->ma_valid = 0;
73         RETURN(obj);
74 }
75
76 void mdt_hsm_dump_hal(int level, const char *prefix,
77                       struct hsm_action_list *hal)
78 {
79         int                      i, sz;
80         struct hsm_action_item  *hai;
81         char                     buf[12];
82
83         CDEBUG(level, "%s: HAL header: version %X count %d"
84                       " archive_id %d flags %#llx\n",
85                prefix, hal->hal_version, hal->hal_count,
86                hal->hal_archive_id, hal->hal_flags);
87
88         hai = hai_first(hal);
89         for (i = 0; i < hal->hal_count; i++) {
90                 sz = hai->hai_len - sizeof(*hai);
91                 CDEBUG(level, "%s %d: fid="DFID" dfid="DFID
92                        " cookie=%#llx"
93                        " action=%s extent=%#llx-%#llx gid=%#llx"
94                        " datalen=%d data=[%s]\n",
95                        prefix, i,
96                        PFID(&hai->hai_fid), PFID(&hai->hai_dfid),
97                        hai->hai_cookie,
98                        hsm_copytool_action2name(hai->hai_action),
99                        hai->hai_extent.offset,
100                        hai->hai_extent.length,
101                        hai->hai_gid, sz,
102                        hai_dump_data_field(hai, buf, sizeof(buf)));
103                 hai = hai_next(hai);
104         }
105 }
106
107 struct hsm_scan_data {
108         struct mdt_thread_info  *hsd_mti;
109         char                     hsd_fsname[MTI_NAME_MAXLEN + 1];
110         /* are we scanning the logs for housekeeping, or just looking
111          * for new work?
112          */
113         bool                     hsd_housekeeping;
114         bool                     hsd_one_restore;
115         u32                      hsd_start_cat_idx;
116         u32                      hsd_start_rec_idx;
117         int                      hsd_action_count;
118         u64                      hsd_request_len; /* array alloc len */
119         u64                      hsd_request_count; /* array used count */
120         struct hsm_scan_request *hsd_request;
121 };
122
123 static inline int mdt_cdt_check_rec(struct llog_agent_req_rec *rec, size_t size)
124 {
125         ENTRY;
126
127         if (rec->arr_hdr.lrh_len > size ||
128             rec->arr_hdr.lrh_len < sizeof(*rec) ||
129             rec->arr_hdr.lrh_type != HSM_AGENT_REC ||
130             rec->arr_hdr.lrh_index !=
131             llog_get_rec_tail(&rec->arr_hdr)->lrt_index ||
132             rec->arr_hdr.lrh_len != llog_get_rec_tail(&rec->arr_hdr)->lrt_len)
133                 RETURN(-EBADR);
134
135         if (rec->arr_status > ARS_SUCCEED)
136                 RETURN(-EBADR);
137
138         if (rec->arr_hai.hai_len > rec->arr_hdr.lrh_len ||
139             rec->arr_hai.hai_len < sizeof(rec->arr_hai) ||
140             (rec->arr_hai.hai_action != HSMA_NONE &&
141              rec->arr_hai.hai_action < HSMA_ARCHIVE &&
142              rec->arr_hai.hai_action > HSMA_CANCEL) ||
143             rec->arr_hai.hai_cookie == 0)
144                 RETURN(-EBADR);
145
146         RETURN(0);
147 }
148
149 static int mdt_cdt_waiting_cb(const struct lu_env *env,
150                               struct mdt_device *mdt,
151                               struct llog_handle *llh,
152                               struct llog_agent_req_rec *larr,
153                               struct hsm_scan_data *hsd)
154 {
155         struct coordinator *cdt = &mdt->mdt_coordinator;
156         struct hsm_scan_request *request;
157         struct cdt_agent_req *car;
158         struct obd_uuid uuid = {.uuid = {0}};
159         struct llog_cookie cookie;
160         size_t hai_size;
161         u32 archive_id;
162         bool wrapped;
163         int i;
164
165         /* Are agents full? */
166         if (atomic_read(&cdt->cdt_request_count) >= cdt->cdt_max_requests)
167                 RETURN(hsd->hsd_housekeeping ? 0 : LLOG_PROC_BREAK);
168
169         if (hsd->hsd_action_count + atomic_read(&cdt->cdt_request_count) >=
170             cdt->cdt_max_requests) {
171                 /* We cannot send any more request
172                  *
173                  *                     *** SPECIAL CASE ***
174                  *
175                  * Restore requests are too important not to schedule at least
176                  * one, everytime we can.
177                  */
178                 if (larr->arr_hai.hai_action != HSMA_RESTORE ||
179                     hsd->hsd_one_restore)
180                         RETURN(hsd->hsd_housekeeping ? 0 : LLOG_PROC_BREAK);
181         }
182
183         hai_size = round_up(larr->arr_hai.hai_len, 8);
184         archive_id = larr->arr_archive_id;
185
186         request = NULL;
187         for (i = 0; i < hsd->hsd_request_count; i++) {
188                 if (hsr_get_archive_id(&hsd->hsd_request[i]) == archive_id &&
189                     hsd->hsd_request[i].hsr_used_sz + hai_size <=
190                     LDLM_MAXREQSIZE) {
191                         request = &hsd->hsd_request[i];
192                         break;
193                 }
194         }
195
196         /* Are we trying to force-schedule a request? */
197         if (hsd->hsd_action_count + atomic_read(&cdt->cdt_request_count) >=
198             cdt->cdt_max_requests) {
199                 /* Is there really no compatible hsm_scan_request? */
200                 if (!request) {
201                         for (i -= 1; i >= 0; i--) {
202                                 if (hsr_get_archive_id(&hsd->hsd_request[i]) ==
203                                     archive_id) {
204                                         request = &hsd->hsd_request[i];
205                                         break;
206                                 }
207                         }
208                 }
209
210                 /* Make room for the car */
211                 if (request) {
212                         /* Discard the last car until there is enough space */
213                         do {
214                                 request->hsr_count--;
215
216                                 car = list_last_entry(&request->hsr_cars,
217                                                       struct cdt_agent_req,
218                                                       car_scan_list);
219                                 list_del_init(&car->car_scan_list);
220
221                                 request->hsr_used_sz -=
222                                         round_up(car->car_hai.hai_len, 8);
223
224                                 mdt_cdt_put_request(car);
225                                 hsd->hsd_action_count--;
226                         } while (request->hsr_used_sz + hai_size >
227                                  LDLM_MAXREQSIZE);
228                 } else if (hsd->hsd_housekeeping) {
229                         struct hsm_scan_request *tmp;
230                         struct cdt_agent_req *pos;
231                         struct cdt_agent_req *tmp2;
232
233                         /* Discard the (whole) records from request */
234                         hsd->hsd_request_count--;
235                         LASSERT(hsd->hsd_request_count >= 0);
236                         tmp = &hsd->hsd_request[hsd->hsd_request_count];
237                         hsd->hsd_action_count -= tmp->hsr_count;
238                         LASSERT(hsd->hsd_action_count >= 0);
239                         list_for_each_entry_safe(pos, tmp2, &tmp->hsr_cars,
240                                                  car_scan_list) {
241                                 list_del_init(&pos->car_scan_list);
242                                 mdt_cdt_put_request(pos);
243                         }
244                 } else {
245                         /* Bailing out, this code path is too hot */
246                         RETURN(LLOG_PROC_BREAK);
247
248                 }
249         }
250
251         if (!request) {
252                 LASSERT(hsd->hsd_request_count < hsd->hsd_request_len);
253                 request = &hsd->hsd_request[hsd->hsd_request_count];
254
255                 INIT_LIST_HEAD(&request->hsr_cars);
256                 request->hsr_fsname = &hsd->hsd_fsname[0];
257                 request->hsr_version = HAL_VERSION;
258                 request->hsr_count = 0;
259                 request->hsr_used_sz = sizeof(struct hsm_action_list) +
260                         __ALIGN_KERNEL(strlen(hsd->hsd_fsname) + 1, 8);
261                 hsd->hsd_request_count++;
262         }
263
264         cookie.lgc_offset = 0;
265         llog_get_cookie(env, &cookie);
266         LASSERTF(cookie.lgc_offset >= llh->lgh_hdr->llh_size,
267                  "Bad record offset %llx for idx %d", cookie.lgc_offset,
268                  larr->arr_hdr.lrh_index);
269
270         car = mdt_cdt_alloc_request(&uuid, larr);
271         if (IS_ERR(car))
272                 RETURN(PTR_ERR(car));
273
274         car->car_hmm->mr_offset = cookie.lgc_offset;
275         car->car_hmm->mr_lid = llh->lgh_id;
276         list_add_tail(&car->car_scan_list, &request->hsr_cars);
277
278         CDEBUG(D_HSM, "Copying record %d to request %px, count %d\n",
279                larr->arr_hdr.lrh_index, request, request->hsr_count);
280
281         request->hsr_count++;
282         request->hsr_used_sz += hai_size;
283         hsd->hsd_action_count++;
284
285         switch (car->car_hmm->mr_rec.arr_hai.hai_action) {
286         case HSMA_CANCEL:
287                 break;
288         case HSMA_RESTORE:
289                 hsd->hsd_one_restore = true;
290                 fallthrough;
291         default:
292                 break;
293         }
294
295         wrapped = llh->lgh_hdr->llh_cat_idx >= llh->lgh_last_idx &&
296                   llh->lgh_hdr->llh_count > 1;
297         if ((!wrapped && llh->lgh_hdr->llh_cat_idx > hsd->hsd_start_cat_idx) ||
298             (wrapped && llh->lgh_hdr->llh_cat_idx < hsd->hsd_start_cat_idx) ||
299             (llh->lgh_hdr->llh_cat_idx == hsd->hsd_start_cat_idx &&
300              larr->arr_hdr.lrh_index > hsd->hsd_start_rec_idx)) {
301                 hsd->hsd_start_cat_idx = llh->lgh_hdr->llh_cat_idx;
302                 hsd->hsd_start_rec_idx = larr->arr_hdr.lrh_index;
303         }
304
305         RETURN(0);
306 }
307
308 static int mdt_cdt_started_cb(const struct lu_env *env,
309                               struct mdt_device *mdt,
310                               struct llog_handle *llh,
311                               struct llog_agent_req_rec *larr,
312                               struct hsm_scan_data *hsd)
313 {
314         struct coordinator *cdt = &mdt->mdt_coordinator;
315         struct hsm_action_item *hai = &larr->arr_hai;
316         struct cdt_agent_req *car;
317         time64_t now = ktime_get_real_seconds();
318         time64_t last;
319         enum changelog_rec_flags clf_flags;
320         int rc;
321
322         if (!hsd->hsd_housekeeping)
323                 RETURN(0);
324
325         /* we search for a running request
326          * error may happen if coordinator crashes or stopped
327          * with running request
328          */
329         car = mdt_cdt_find_request(cdt, hai->hai_cookie);
330         if (car == NULL) {
331                 last = larr->arr_req_change;
332         } else {
333                 last = car->car_req_update;
334         }
335
336         /* test if request too long, if yes cancel it
337          * the same way the copy tool acknowledge a cancel request */
338         if (now <= last + cdt->cdt_active_req_timeout)
339                 GOTO(out_car, rc = 0);
340
341         dump_llog_agent_req_rec("request timed out, start cleaning", larr);
342
343         if (car != NULL) {
344                 car->car_req_update = now;
345                 mdt_hsm_agent_update_statistics(cdt, 0, 1, 0, &car->car_uuid);
346                 /* Remove car from memory list (LU-9075) */
347                 mdt_cdt_remove_request(cdt, hai->hai_cookie);
348         }
349
350         /* Emit a changelog record for the failed action.*/
351         clf_flags = 0;
352         hsm_set_cl_error(&clf_flags, ECANCELED);
353
354         switch (hai->hai_action) {
355         case HSMA_ARCHIVE:
356                 hsm_set_cl_event(&clf_flags, HE_ARCHIVE);
357                 break;
358         case HSMA_RESTORE:
359                 hsm_set_cl_event(&clf_flags, HE_RESTORE);
360                 break;
361         case HSMA_REMOVE:
362                 hsm_set_cl_event(&clf_flags, HE_REMOVE);
363                 break;
364         case HSMA_CANCEL:
365                 hsm_set_cl_event(&clf_flags, HE_CANCEL);
366                 break;
367         default:
368                 /* Unknown record type, skip changelog. */
369                 clf_flags = 0;
370                 break;
371         }
372
373         if (clf_flags != 0)
374                 mo_changelog(env, CL_HSM, clf_flags, mdt->mdt_child,
375                              &hai->hai_fid);
376
377         if (hai->hai_action == HSMA_RESTORE)
378                 cdt_restore_handle_del(hsd->hsd_mti, cdt, &hai->hai_fid);
379
380         larr->arr_status = ARS_CANCELED;
381         larr->arr_req_change = now;
382         rc = llog_write(env, llh, &larr->arr_hdr, larr->arr_hdr.lrh_index);
383         if (rc < 0) {
384                 CERROR("%s: cannot update agent log: rc = %d\n",
385                        mdt_obd_name(mdt), rc);
386                 rc = LLOG_DEL_RECORD;
387         }
388
389         /* ct has completed a request, so a slot is available,
390          * signal the coordinator to find new work */
391         mdt_hsm_cdt_event(cdt);
392 out_car:
393         if (car != NULL)
394                 mdt_cdt_put_request(car);
395
396         RETURN(rc);
397 }
398
399 /**
400  *  llog_cat_process() callback, used to:
401  *  - find waiting request and start action
402  *  - purge canceled and done requests
403  * \param env [IN] environment
404  * \param llh [IN] llog handle
405  * \param hdr [IN] llog record
406  * \param data [IN/OUT] cb data = struct hsm_scan_data
407  * \retval 0 success
408  * \retval -ve failure
409  */
410 static int mdt_coordinator_cb(const struct lu_env *env,
411                               struct llog_handle *llh,
412                               struct llog_rec_hdr *hdr,
413                               void *data)
414 {
415         struct llog_agent_req_rec *larr = (struct llog_agent_req_rec *)hdr;
416         struct hsm_scan_data *hsd = data;
417         struct mdt_device *mdt = hsd->hsd_mti->mti_mdt;
418         struct coordinator *cdt = &mdt->mdt_coordinator;
419         int rc;
420
421         ENTRY;
422
423         if (cdt->cdt_state == CDT_DISABLE)
424                 RETURN(-ECANCELED);
425
426         larr = (struct llog_agent_req_rec *)hdr;
427         rc = mdt_cdt_check_rec(larr, larr->arr_hdr.lrh_len);
428         if (rc) {
429                 CDEBUG(D_HSM, "%s: bad llog record "DOSTID" idx %d, rc = %d\n",
430                        llh->lgh_ctxt->loc_obd->obd_name,
431                        POSTID(&llh->lgh_id.lgl_oi), larr->arr_hdr.lrh_index,
432                        rc);
433                 RETURN(LLOG_DEL_RECORD);
434         }
435         dump_llog_agent_req_rec("mdt_coordinator_cb(): ", larr);
436         switch (larr->arr_status) {
437         case ARS_WAITING:
438                 RETURN(mdt_cdt_waiting_cb(env, mdt, llh, larr, hsd));
439         case ARS_STARTED:
440                 RETURN(mdt_cdt_started_cb(env, mdt, llh, larr, hsd));
441         default:
442                 if (!hsd->hsd_housekeeping)
443                         RETURN(0);
444
445                 if ((larr->arr_req_change + cdt->cdt_grace_delay) <
446                     ktime_get_real_seconds()) {
447                         RETURN(LLOG_DEL_RECORD);
448                 }
449
450                 RETURN(0);
451         }
452 }
453
454 static void cdt_crh_free(struct rcu_head *head)
455 {
456         struct cdt_restore_handle *crh;
457
458         crh = container_of(head, struct cdt_restore_handle, crh_rcu);
459         OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
460 }
461
462 static void
463 cdt_crh_put(struct cdt_restore_handle *crh, struct mdt_thread_info *cdt_mti)
464 {
465         if (atomic_dec_and_test(&crh->crh_refc)) {
466                 /* XXX We pass a NULL object since the restore handle does not
467                  * keep a reference on the object being restored.
468                  */
469                 if (lustre_handle_is_used(&crh->crh_lh.mlh_reg_lh))
470                         mdt_object_unlock(cdt_mti, NULL, &crh->crh_lh, 1);
471                 call_rcu(&crh->crh_rcu, cdt_crh_free);
472         }
473 }
474
475 static void crh_free_hash(void *vcrh, void *vcdt_mti)
476 {
477         struct cdt_restore_handle *crh = vcrh;
478         struct mdt_thread_info *cdt_mti = vcdt_mti;
479
480         /* put last reference */
481         cdt_crh_put(crh, cdt_mti);
482 }
483
484 static const struct rhashtable_params crh_hash_params = {
485         .key_len        = sizeof(struct lu_fid),
486         .key_offset     = offsetof(struct cdt_restore_handle, crh_fid),
487         .head_offset    = offsetof(struct cdt_restore_handle, crh_hash),
488         .hashfn         = lu_fid_hash,
489         .automatic_shrinking = true,
490 };
491
492 /* Release the ressource used by the coordinator. Called when the
493  * coordinator is stopping. */
494 static void mdt_hsm_cdt_cleanup(struct mdt_device *mdt)
495 {
496         struct coordinator              *cdt = &mdt->mdt_coordinator;
497         struct cdt_agent_req            *car, *tmp1;
498         struct hsm_agent                *ha, *tmp2;
499         struct mdt_thread_info          *cdt_mti;
500
501         /* start cleaning */
502         down_write(&cdt->cdt_request_lock);
503         list_for_each_entry_safe(car, tmp1, &cdt->cdt_request_list,
504                                  car_request_list) {
505                 cfs_hash_del(cdt->cdt_request_cookie_hash,
506                              &car->car_hai.hai_cookie,
507                              &car->car_cookie_hash);
508                 list_del(&car->car_request_list);
509                 mdt_cdt_put_request(car);
510         }
511         up_write(&cdt->cdt_request_lock);
512
513         down_write(&cdt->cdt_agent_lock);
514         list_for_each_entry_safe(ha, tmp2, &cdt->cdt_agents, ha_list) {
515                 list_del(&ha->ha_list);
516                 if (ha->ha_archive_cnt != 0)
517                         OBD_FREE_PTR_ARRAY(ha->ha_archive_id,
518                                            ha->ha_archive_cnt);
519                 OBD_FREE_PTR(ha);
520         }
521         up_write(&cdt->cdt_agent_lock);
522
523         cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
524         rhashtable_free_and_destroy(&cdt->cdt_restore_hash, crh_free_hash,
525                                     cdt_mti);
526         rcu_barrier();
527 }
528
529 /*
530  * Coordinator state transition table, indexed on enum cdt_states, taking
531  * from and to states. For instance since CDT_INIT to CDT_RUNNING is a
532  * valid transition, cdt_transition[CDT_INIT][CDT_RUNNING] is true.
533  */
534 static bool cdt_transition[CDT_STATES_COUNT][CDT_STATES_COUNT] = {
535         /* from -> to:    stopped init   running disable stopping */
536         /* stopped */   { true,   true,  false,  false,  false },
537         /* init */      { true,   false, true,   false,  false },
538         /* running */   { false,  false, true,   true,   true },
539         /* disable */   { false,  false, true,   true,   true },
540         /* stopping */  { true,   false, false,  false,  false }
541 };
542
543 /**
544  * Change coordinator thread state
545  * Some combinations are not valid, so catch them here.
546  *
547  * Returns 0 on success, with old_state set if not NULL, or -EINVAL if
548  * the transition was not possible.
549  */
550 static int set_cdt_state_locked(struct coordinator *cdt,
551                                 enum cdt_states new_state)
552 {
553         int rc;
554         enum cdt_states state;
555
556         state = cdt->cdt_state;
557
558         if (cdt_transition[state][new_state]) {
559                 cdt->cdt_state = new_state;
560                 rc = 0;
561         } else {
562                 CDEBUG(D_HSM,
563                        "unexpected coordinator transition, from=%s, to=%s\n",
564                        cdt_mdt_state2str(state), cdt_mdt_state2str(new_state));
565                 rc = -EINVAL;
566         }
567
568         return rc;
569 }
570
571 static int set_cdt_state(struct coordinator *cdt, enum cdt_states new_state)
572 {
573         int rc;
574
575         mutex_lock(&cdt->cdt_state_lock);
576         rc = set_cdt_state_locked(cdt, new_state);
577         mutex_unlock(&cdt->cdt_state_lock);
578
579         return rc;
580 }
581
582 int cdt_getref_try(struct coordinator *cdt)
583 {
584         return refcount_inc_not_zero(&cdt->cdt_ref);
585 }
586
587 void cdt_putref(struct coordinator *cdt)
588 {
589         if (refcount_dec_and_test(&cdt->cdt_ref))
590                 wake_up(&cdt->cdt_waitq);
591 }
592
593 static int mdt_hsm_pending_restore(struct mdt_thread_info *mti);
594
595 static int cdt_start_pending_restore(struct mdt_device *mdt,
596                                      struct coordinator *cdt)
597 {
598         struct mdt_thread_info *cdt_mti;
599         unsigned int i = 0;
600         int rc;
601
602         /* wait until MDD initialize hsm actions llog */
603         while (!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state) && i < obd_timeout) {
604                 schedule_timeout_interruptible(cfs_time_seconds(1));
605                 if (kthread_should_stop())
606                         return -ESHUTDOWN;
607                 i++;
608         }
609         if (!test_bit(MDT_FL_CFGLOG, &mdt->mdt_state))
610                 CWARN("%s: trying to init HSM before MDD\n", mdt_obd_name(mdt));
611
612         /* set up list of started restore requests */
613         cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
614         rc = mdt_hsm_pending_restore(cdt_mti);
615         if (rc)
616                 CERROR("%s: cannot take the layout locks needed for registered restore: %d\n",
617                        mdt_obd_name(mdt), rc);
618
619         return rc;
620 }
621
622 /**
623  * coordinator thread
624  * \param data [IN] obd device
625  * \retval 0 success
626  * \retval -ve failure
627  */
628 static int mdt_coordinator(void *data)
629 {
630         struct mdt_thread_info  *mti = data;
631         struct mdt_device       *mdt = mti->mti_mdt;
632         struct coordinator      *cdt = &mdt->mdt_coordinator;
633         struct hsm_scan_data     hsd = { NULL };
634         time64_t                 last_housekeeping = 0;
635         int rc;
636         ENTRY;
637
638         CDEBUG(D_HSM, "%s: coordinator thread starting, pid=%d\n",
639                mdt_obd_name(mdt), current->pid);
640
641         hsd.hsd_mti = mti;
642         obd_uuid2fsname(hsd.hsd_fsname, mdt_obd_name(mdt),
643                         sizeof(hsd.hsd_fsname));
644
645         set_cdt_state(cdt, CDT_RUNNING);
646
647         /* Inform mdt_hsm_cdt_start(). */
648         wake_up(&cdt->cdt_waitq);
649
650         /* this initilazes cdt_last_cookie too */
651         rc = cdt_start_pending_restore(mdt, cdt);
652         if (rc < 0 || kthread_should_stop())
653                 GOTO(fail_to_start, rc);
654
655         refcount_set(&cdt->cdt_ref, 1);
656
657         while (1) {
658                 int i;
659                 u32 start_cat_idx;
660                 u32 start_rec_idx;
661
662                 if (cdt->cdt_state == CDT_DISABLE) {
663                         cdt->cdt_idle = true;
664                         wake_up(&cdt->cdt_cancel_all);
665                 }
666                 /* Limit execution of the expensive requests traversal
667                  * to at most one second. This prevents repeatedly
668                  * locking/unlocking the catalog for each request
669                  * and preventing other HSM operations from happening
670                  */
671                 wait_event_interruptible_timeout(cdt->cdt_waitq,
672                                                  kthread_should_stop() ||
673                                                  cdt->cdt_wakeup_coordinator,
674                                                  cfs_time_seconds(1));
675
676                 cdt->cdt_wakeup_coordinator = false;
677                 CDEBUG(D_HSM, "coordinator resumes\n");
678
679                 if (kthread_should_stop()) {
680                         CDEBUG(D_HSM, "Coordinator stops\n");
681
682                         /* Drop the running ref */
683                         cdt_putref(cdt);
684                         /* Wait threads to finish */
685                         wait_event(cdt->cdt_waitq,
686                                    refcount_read(&cdt->cdt_ref) == 0);
687                         rc = 0;
688                         break;
689                 }
690
691                 /* if coordinator is suspended continue to wait */
692                 if (cdt->cdt_state == CDT_DISABLE) {
693                         CDEBUG(D_HSM, "disable state, coordinator sleeps\n");
694                         continue;
695                 }
696
697                 cdt->cdt_idle = false;
698                 /* If no event, and no housekeeping to do, continue to
699                  * wait. */
700                 if (last_housekeeping + cdt->cdt_loop_period <=
701                     ktime_get_real_seconds()) {
702                         last_housekeeping = ktime_get_real_seconds();
703                         hsd.hsd_housekeeping = true;
704                         start_cat_idx = 0;
705                         start_rec_idx = 0;
706                 } else if (cdt->cdt_event) {
707                         hsd.hsd_housekeeping = false;
708                         start_cat_idx = hsd.hsd_start_cat_idx;
709                         start_rec_idx = hsd.hsd_start_rec_idx;
710                 } else {
711                         continue;
712                 }
713
714                 cdt->cdt_event = false;
715
716                 CDEBUG(D_HSM, "coordinator starts reading llog\n");
717
718                 if (hsd.hsd_request_len != cdt->cdt_max_requests) {
719                         /* cdt_max_requests has changed,
720                          * we need to allocate a new buffer
721                          */
722                         struct hsm_scan_request *tmp = NULL;
723                         u64 max_requests = cdt->cdt_max_requests;
724
725                         OBD_ALLOC_PTR_ARRAY_LARGE(tmp, max_requests);
726                         if (!tmp) {
727                                 CERROR("%s: error resizing buffer to %llu, keep %llu: rc = %d\n",
728                                        mdt_obd_name(mdt), max_requests,
729                                        hsd.hsd_request_len, -ENOMEM);
730                         } else {
731                                 if (hsd.hsd_request != NULL)
732                                         OBD_FREE_PTR_ARRAY_LARGE(
733                                                 hsd.hsd_request,
734                                                 hsd.hsd_request_len);
735
736                                 hsd.hsd_request_len = max_requests;
737                                 hsd.hsd_request = tmp;
738                         }
739                 }
740
741                 hsd.hsd_action_count = 0;
742                 hsd.hsd_request_count = 0;
743                 hsd.hsd_one_restore = false;
744
745                 rc = cdt_llog_process(mti->mti_env, mdt, mdt_coordinator_cb,
746                                       &hsd, start_cat_idx, start_rec_idx);
747                 if (rc < 0)
748                         goto clean_cb_alloc;
749
750                 CDEBUG(D_HSM, "found %llu requests to send\n",
751                        hsd.hsd_request_count);
752
753                 if (list_empty(&cdt->cdt_agents)) {
754                         CDEBUG(D_HSM, "no agent available, "
755                                       "coordinator sleeps\n");
756                         /* reset HSM scanning index range. */
757                         hsd.hsd_start_cat_idx = start_cat_idx;
758                         hsd.hsd_start_rec_idx = start_rec_idx;
759                         goto clean_cb_alloc;
760                 }
761
762                 /* here hsd contains a list of requests to be started */
763                 for (i = 0; i < hsd.hsd_request_count; i++) {
764                         struct hsm_scan_request *request = &hsd.hsd_request[i];
765
766                         /* still room for work ? */
767                         if (atomic_read(&cdt->cdt_request_count) >=
768                             cdt->cdt_max_requests)
769                                 break;
770
771                         /* if cancels happen during llog process or sending
772                          * assumes that other records are cancelled
773                          */
774                         if (cdt->cdt_state == CDT_DISABLE)
775                                 goto clean_cb_alloc;
776
777                         rc = mdt_hsm_agent_send(mti, request, 0);
778                         /* if failure, we suppose it is temporary
779                          * if the copy tool failed to do the request
780                          * it has to use hsm_progress
781                          */
782
783                         /* TODO: narrow down the HSM action range that already
784                          * scanned accroding to the cookies when a failure
785                          * occurs.
786                          */
787                         if (rc) {
788                                 hsd.hsd_start_cat_idx = start_cat_idx;
789                                 hsd.hsd_start_rec_idx = start_rec_idx;
790                         }
791                 }
792
793 clean_cb_alloc:
794                 /* free hal allocated by callback */
795                 for (i = 0; i < hsd.hsd_request_count; i++) {
796                         struct hsm_scan_request *request = &hsd.hsd_request[i];
797                         struct cdt_agent_req *pos;
798                         struct cdt_agent_req *tmp;
799
800                         list_for_each_entry_safe(pos, tmp, &request->hsr_cars,
801                                                  car_scan_list) {
802                                 list_del_init(&pos->car_scan_list);
803                                 mdt_cdt_put_request(pos);
804                         }
805
806                 }
807         }
808
809         if (hsd.hsd_request != NULL)
810                 OBD_FREE_PTR_ARRAY_LARGE(hsd.hsd_request, hsd.hsd_request_len);
811
812 fail_to_start:
813         mdt_hsm_cdt_cleanup(mdt);
814
815         if (rc != 0)
816                 CERROR("%s: coordinator thread exiting, process=%d, rc=%d\n",
817                        mdt_obd_name(mdt), current->pid, rc);
818         else
819                 CDEBUG(D_HSM, "%s: coordinator thread exiting, process=%d,"
820                               " no error\n",
821                        mdt_obd_name(mdt), current->pid);
822
823         /* Clear cdt_task under lock to avoid race with mdt_hsm_cdt_stop() */
824         mutex_lock(&cdt->cdt_state_lock);
825         cdt->cdt_task = NULL;
826         set_cdt_state_locked(cdt, CDT_STOPPED);
827         mutex_unlock(&cdt->cdt_state_lock);
828
829         /* Inform mdt_hsm_cdt_stop(). */
830         wake_up(&cdt->cdt_waitq);
831
832         RETURN(rc);
833 }
834
835 /**
836  * register a new HSM restore handle for a file and take EX lock on the layout
837  * \param mti [IN] thread info
838  * \param cdt [IN] coordinator
839  * \param fid [IN] fid of the file to restore
840  * \param he  [IN] HSM extent
841  * \retval 0 success
842  * \retval 1 restore handle already exists for the fid
843  * \retval -ve failure
844  */
845 int cdt_restore_handle_add(struct mdt_thread_info *mti, struct coordinator *cdt,
846                            const struct lu_fid *fid,
847                            const struct hsm_extent *he)
848 {
849         struct cdt_restore_handle *crh;
850         struct mdt_object *obj;
851         int rc;
852         ENTRY;
853
854         OBD_SLAB_ALLOC_PTR(crh, mdt_hsm_cdt_kmem);
855         if (crh == NULL)
856                 RETURN(-ENOMEM);
857
858         crh->crh_fid = *fid;
859         /* in V1 all file is restored
860          * crh->extent.start = he->offset;
861          * crh->extent.end = he->offset + he->length;
862          */
863         crh->crh_extent.start = 0;
864         crh->crh_extent.end = he->length;
865         atomic_set(&crh->crh_refc, 2);
866
867         rc = rhashtable_lookup_insert_fast(&cdt->cdt_restore_hash,
868                                            &crh->crh_hash, crh_hash_params);
869         if (rc) {
870                 OBD_SLAB_FREE_PTR(crh, mdt_hsm_cdt_kmem);
871                 RETURN(rc);
872         }
873
874         /* get the layout lock */
875         obj = mdt_object_find_lock(mti, &crh->crh_fid, &crh->crh_lh,
876                                    MDS_INODELOCK_LAYOUT, LCK_EX);
877         if (IS_ERR(obj)) {
878                 rc = rhashtable_remove_fast(&cdt->cdt_restore_hash,
879                                             &crh->crh_hash, crh_hash_params);
880                 /* rc < 0 means it has been removed in a parallel thread.
881                  * This shouldn't happen by design as at current stage record
882                  * hasn't been added in llog yet.
883                  */
884                 if (!rc)
885                         cdt_crh_put(crh, mti);
886                 cdt_crh_put(crh, mti);
887
888                 RETURN(PTR_ERR(obj));
889         }
890
891         /* We do not keep a reference on the object during the restore
892          * which can be very long.
893          */
894         mdt_object_put(mti->mti_env, obj);
895         cdt_crh_put(crh, mti);
896         RETURN(rc);
897 }
898
899 /**
900  * lookup a restore handle by FID
901  * \param cdt [IN] coordinator
902  * \param fid [IN] FID
903  * \retval true cdt_restore_handle found
904  * \retval false not found
905  */
906 bool cdt_restore_handle_exists(struct coordinator *cdt,
907                                const struct lu_fid *fid)
908 {
909         return rhashtable_lookup_fast(&cdt->cdt_restore_hash, fid,
910                                       crh_hash_params);
911 }
912
913 void cdt_restore_handle_del(struct mdt_thread_info *mti,
914                             struct coordinator *cdt, const struct lu_fid *fid)
915 {
916         struct cdt_restore_handle *crh;
917
918         /* give back layout lock */
919         rcu_read_lock();
920         crh = rhashtable_lookup(&cdt->cdt_restore_hash, fid, crh_hash_params);
921         if (crh &&
922             rhashtable_remove_fast(&cdt->cdt_restore_hash, &crh->crh_hash,
923                                    crh_hash_params))
924                 crh = NULL;
925         rcu_read_unlock();
926
927         /* crh has been removed in a parallel thread */
928         if (crh == NULL)
929                 return;
930
931         cdt_crh_put(crh, mti);
932 }
933
934 /**
935  * data passed to llog_cat_process() callback
936  * to scan requests and take actions
937  */
938 struct hsm_restore_data {
939         struct mdt_thread_info  *hrd_mti;
940 };
941
942 /**
943  *  llog_cat_process() callback, used to:
944  *  - find restore request and allocate the restore handle
945  * \param env [IN] environment
946  * \param llh [IN] llog handle
947  * \param hdr [IN] llog record
948  * \param data [IN/OUT] cb data = struct hsm_restore_data
949  * \retval 0 success
950  * \retval -ve failure
951  */
952 static int hsm_restore_cb(const struct lu_env *env,
953                           struct llog_handle *llh,
954                           struct llog_rec_hdr *hdr, void *data)
955 {
956         struct llog_agent_req_rec       *larr;
957         struct hsm_restore_data         *hrd;
958         struct hsm_action_item          *hai;
959         struct mdt_thread_info          *mti;
960         struct coordinator              *cdt;
961         int rc;
962         ENTRY;
963
964         hrd = data;
965         mti = hrd->hrd_mti;
966         cdt = &mti->mti_mdt->mdt_coordinator;
967
968         larr = (struct llog_agent_req_rec *)hdr;
969         hai = &larr->arr_hai;
970
971         if (hai->hai_cookie > atomic64_read(&cdt->cdt_last_cookie)) {
972                 /* update the cookie to avoid collision */
973                 atomic64_set(&cdt->cdt_last_cookie, hai->hai_cookie);
974         }
975
976         if (hai->hai_action != HSMA_RESTORE ||
977             agent_req_in_final_state(larr->arr_status))
978                 RETURN(0);
979
980         /* restore request not in a final state */
981
982         /* force replay of restore requests left in started state from previous
983          * CDT context, to be canceled later if finally found to be incompatible
984          * when being re-started */
985         if (larr->arr_status == ARS_STARTED) {
986                 larr->arr_status = ARS_WAITING;
987                 larr->arr_req_change = ktime_get_real_seconds();
988                 rc = llog_write(env, llh, hdr, hdr->lrh_index);
989                 if (rc != 0)
990                         GOTO(out, rc);
991         }
992
993         rc = cdt_restore_handle_add(mti, cdt, &hai->hai_fid, &hai->hai_extent);
994         if (rc == -EEXIST) {
995                 CWARN("%s: duplicate restore record for fid="DFID" found in the llog: rc = %d\n",
996                       mdt_obd_name(mti->mti_mdt), PFID(&hai->hai_fid), rc);
997                 rc = 0;
998         }
999 out:
1000         RETURN(rc);
1001 }
1002
1003 /**
1004  * restore coordinator state at startup
1005  * the goal is to take a layout lock for each registered restore request
1006  * \param mti [IN] context
1007  */
1008 static int mdt_hsm_pending_restore(struct mdt_thread_info *mti)
1009 {
1010         struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
1011         struct hsm_restore_data  hrd;
1012         int rc;
1013         ENTRY;
1014
1015         hrd.hrd_mti = mti;
1016
1017         rc = cdt_llog_process(mti->mti_env, mti->mti_mdt, hsm_restore_cb, &hrd,
1018                               0, 0);
1019
1020         if (rc < 0)
1021                 RETURN(rc);
1022
1023         /* no pending request found -> start a new session */
1024         if (!atomic64_read(&cdt->cdt_last_cookie))
1025                 atomic64_set(&cdt->cdt_last_cookie, ktime_get_real_seconds());
1026
1027         RETURN(0);
1028 }
1029
1030 int hsm_init_ucred(struct lu_ucred *uc)
1031 {
1032         ENTRY;
1033         uc->uc_valid = UCRED_OLD;
1034         uc->uc_o_uid = 0;
1035         uc->uc_o_gid = 0;
1036         uc->uc_o_fsuid = 0;
1037         uc->uc_o_fsgid = 0;
1038         uc->uc_uid = 0;
1039         uc->uc_gid = 0;
1040         uc->uc_fsuid = 0;
1041         uc->uc_fsgid = 0;
1042         uc->uc_suppgids[0] = -1;
1043         uc->uc_suppgids[1] = -1;
1044         uc->uc_cap = cap_combine(CAP_FS_SET, CAP_NFSD_SET);
1045         uc->uc_umask = 0777;
1046         uc->uc_ginfo = NULL;
1047         uc->uc_identity = NULL;
1048         /* always record internal HSM activity if also enabled globally */
1049         uc->uc_enable_audit = 1;
1050         /* do not let rbac interfere with HSM internal processing */
1051         uc->uc_rbac_file_perms = 1;
1052         uc->uc_rbac_dne_ops = 1;
1053         uc->uc_rbac_quota_ops = 1;
1054         uc->uc_rbac_byfid_ops = 1;
1055         uc->uc_rbac_chlg_ops = 1;
1056         uc->uc_rbac_fscrypt_admin = 1;
1057         uc->uc_rbac_server_upcall = 1;
1058         uc->uc_rbac_ignore_root_prjquota = 1;
1059         uc->uc_rbac_hsm_ops = 1;
1060         uc->uc_rbac_local_admin = 1;
1061
1062         RETURN(0);
1063 }
1064
1065 #define HAI_DATA_SIZE_EST (128)
1066 #define HAI_SIZE_EST (sizeof(struct hsm_action_item) + HAI_DATA_SIZE_EST)
1067 #define HSM_ACTIVE_REQ_SIZE_EST (sizeof(struct cdt_agent_req) + \
1068                                  sizeof(struct hsm_mem_req_rec) + \
1069                                  HAI_DATA_SIZE_EST)
1070 /* mdt_coordinatoor prealloc: max_requests * sizeof(struct hsm_scan_request) */
1071 #define HSM_SCAN_REQ_SIZE (sizeof(struct hsm_scan_request))
1072
1073 /* The memory footprint estimation is the sum of the memory needed to build hal
1074  * requests and the one needed to cache the active requests.
1075  */
1076 #define HSM_REQ_MEM_FOOTPRINT_EST (HSM_SCAN_REQ_SIZE + HSM_ACTIVE_REQ_SIZE_EST)
1077
1078 static u64 max_requests_total;
1079 static DEFINE_SPINLOCK(max_requests_total_lock);
1080
1081 /* Limit total max_requests to 1/8 total memory */
1082 static int mdt_hsm_max_requests_update(struct coordinator *cdt, u64 new)
1083 {
1084         u64 max_ram = cfs_totalram_pages() * PAGE_SIZE / 8;
1085         int rc = 0;
1086
1087         if (new == cdt->cdt_max_requests)
1088                 return 0;
1089
1090         spin_lock(&max_requests_total_lock);
1091         if (new < cdt->cdt_max_requests) {
1092                 LASSERT(max_requests_total >= cdt->cdt_max_requests - new);
1093                 max_requests_total -= cdt->cdt_max_requests - new;
1094                 cdt->cdt_max_requests = new;
1095         } else if (new > cdt->cdt_max_requests) {
1096                 u64 max_ram_reqs = max_ram / HSM_REQ_MEM_FOOTPRINT_EST;
1097                 u64 to_add = new - cdt->cdt_max_requests;
1098                 struct mdt_device *mdt = container_of(cdt, typeof(*mdt),
1099                                                       mdt_coordinator);
1100
1101                 if (to_add > max_ram_reqs ||
1102                     max_requests_total > max_ram_reqs - to_add) {
1103                         rc = -ENOMEM;
1104                         LCONSOLE_WARN("%s: No more memory to set HSM max_requests=%llu (max request memory: %lluMB, current total %llu/%llu): rc = %d\n",
1105                                       mdt_obd_name(mdt), new, max_ram >> 20,
1106                                       max_requests_total, max_ram_reqs, rc);
1107                         to_add = max_ram_reqs - max_requests_total;
1108                 }
1109
1110                 max_requests_total += to_add;
1111                 cdt->cdt_max_requests += to_add;
1112
1113                 /* no memory available for a new MDT -> allow 1 more request */
1114                 if (!cdt->cdt_max_requests) {
1115                         max_requests_total++;
1116                         cdt->cdt_max_requests++;
1117                 }
1118         }
1119         spin_unlock(&max_requests_total_lock);
1120
1121         return rc;
1122 }
1123
1124 /**
1125  * initialize coordinator struct
1126  * \param mdt [IN] device
1127  * \retval 0 success
1128  * \retval -ve failure
1129  */
1130 int mdt_hsm_cdt_init(struct mdt_device *mdt)
1131 {
1132         struct coordinator      *cdt = &mdt->mdt_coordinator;
1133         struct mdt_thread_info  *cdt_mti = NULL;
1134         int                      rc;
1135         ENTRY;
1136
1137         init_waitqueue_head(&cdt->cdt_waitq);
1138         init_waitqueue_head(&cdt->cdt_cancel_all);
1139         init_rwsem(&cdt->cdt_agent_lock);
1140         init_rwsem(&cdt->cdt_request_lock);
1141         mutex_init(&cdt->cdt_state_lock);
1142         set_cdt_state(cdt, CDT_STOPPED);
1143
1144         INIT_LIST_HEAD(&cdt->cdt_request_list);
1145         INIT_LIST_HEAD(&cdt->cdt_agents);
1146
1147         cdt->cdt_request_cookie_hash = cfs_hash_create("REQUEST_COOKIE_HASH",
1148                                                        CFS_HASH_BITS_MIN,
1149                                                        CFS_HASH_BITS_MAX,
1150                                                        CFS_HASH_BKT_BITS,
1151                                                        0 /* extra bytes */,
1152                                                        CFS_HASH_MIN_THETA,
1153                                                        CFS_HASH_MAX_THETA,
1154                                                 &cdt_request_cookie_hash_ops,
1155                                                        CFS_HASH_DEFAULT);
1156         if (cdt->cdt_request_cookie_hash == NULL)
1157                 RETURN(-ENOMEM);
1158
1159         rc = lu_env_init(&cdt->cdt_env, LCT_MD_THREAD);
1160         if (rc < 0)
1161                 GOTO(out_request_cookie_hash, rc);
1162
1163         /* for mdt_ucred(), lu_ucred stored in lu_ucred_key */
1164         rc = lu_context_init(&cdt->cdt_session, LCT_SERVER_SESSION);
1165         if (rc < 0)
1166                 GOTO(out_env, rc);
1167
1168         lu_context_enter(&cdt->cdt_session);
1169         cdt->cdt_env.le_ses = &cdt->cdt_session;
1170
1171         cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
1172         LASSERT(cdt_mti != NULL);
1173
1174         cdt_mti->mti_env = &cdt->cdt_env;
1175         cdt_mti->mti_mdt = mdt;
1176
1177         hsm_init_ucred(mdt_ucred(cdt_mti));
1178
1179         /* default values for sysfs tunnables
1180          * can be override by MGS conf */
1181         cdt->cdt_default_archive_id = 1;
1182         cdt->cdt_grace_delay = 60;
1183         cdt->cdt_loop_period = 10;
1184         cdt->cdt_policy = CDT_DEFAULT_POLICY;
1185         cdt->cdt_active_req_timeout = 3600;
1186
1187         cdt->cdt_max_requests = 0;
1188         mdt_hsm_max_requests_update(cdt, 3);
1189
1190         /* by default do not remove archives on last unlink */
1191         cdt->cdt_remove_archive_on_last_unlink = false;
1192         cdt->cdt_idle = true;
1193
1194         RETURN(0);
1195
1196 out_env:
1197         lu_env_fini(&cdt->cdt_env);
1198 out_request_cookie_hash:
1199         cfs_hash_putref(cdt->cdt_request_cookie_hash);
1200         cdt->cdt_request_cookie_hash = NULL;
1201
1202         return rc;
1203 }
1204
1205 /**
1206  * free a coordinator thread
1207  * \param mdt [IN] device
1208  */
1209 int  mdt_hsm_cdt_fini(struct mdt_device *mdt)
1210 {
1211         struct coordinator *cdt = &mdt->mdt_coordinator;
1212         ENTRY;
1213
1214         mdt_hsm_max_requests_update(cdt, 0);
1215
1216         lu_context_exit(cdt->cdt_env.le_ses);
1217         lu_context_fini(cdt->cdt_env.le_ses);
1218
1219         lu_env_fini(&cdt->cdt_env);
1220
1221         cfs_hash_putref(cdt->cdt_request_cookie_hash);
1222         cdt->cdt_request_cookie_hash = NULL;
1223
1224         RETURN(0);
1225 }
1226
1227 /**
1228  * start a coordinator thread
1229  * \param mdt [IN] device
1230  * \retval 0 success
1231  * \retval -ve failure
1232  */
1233 static int mdt_hsm_cdt_start(struct mdt_device *mdt)
1234 {
1235         struct coordinator *cdt = &mdt->mdt_coordinator;
1236         struct mdt_thread_info *cdt_mti;
1237         int rc;
1238         void *ptr;
1239         struct task_struct *task;
1240         ENTRY;
1241
1242         /* functions defined but not yet used
1243          * this avoid compilation warning
1244          */
1245         ptr = dump_requests;
1246
1247         rc = set_cdt_state(cdt, CDT_INIT);
1248         if (rc) {
1249                 CERROR("%s: Coordinator already started or stopping\n",
1250                        mdt_obd_name(mdt));
1251                 RETURN(-EALREADY);
1252         }
1253
1254         BUILD_BUG_ON(BIT(CDT_POLICY_SHIFT_COUNT - 1) != CDT_POLICY_LAST);
1255         cdt->cdt_policy = CDT_DEFAULT_POLICY;
1256
1257         atomic_set(&cdt->cdt_request_count, 0);
1258         atomic_set(&cdt->cdt_archive_count, 0);
1259         atomic_set(&cdt->cdt_restore_count, 0);
1260         atomic_set(&cdt->cdt_remove_count, 0);
1261         cdt->cdt_user_request_mask = (1UL << HSMA_RESTORE);
1262         cdt->cdt_group_request_mask = (1UL << HSMA_RESTORE);
1263         cdt->cdt_other_request_mask = (1UL << HSMA_RESTORE);
1264         rc = rhashtable_init(&cdt->cdt_restore_hash, &crh_hash_params);
1265         if (rc) {
1266                 CERROR("%s: failed to create cdt_restore hash: rc = %d\n",
1267                        mdt_obd_name(mdt), rc);
1268                 set_cdt_state(cdt, CDT_STOPPED);
1269                 RETURN(rc);
1270         }
1271
1272         /* to avoid deadlock when start is made through sysfs
1273          * sysfs entries are created by the coordinator thread
1274          */
1275         if (mdt->mdt_bottom->dd_rdonly)
1276                 RETURN(0);
1277
1278         cdt_mti = lu_context_key_get(&cdt->cdt_env.le_ctx, &mdt_thread_key);
1279         task = kthread_run(mdt_coordinator, cdt_mti, "hsm_cdtr");
1280         if (IS_ERR(task)) {
1281                 rc = PTR_ERR(task);
1282                 set_cdt_state(cdt, CDT_STOPPED);
1283                 CERROR("%s: error starting coordinator thread: %d\n",
1284                        mdt_obd_name(mdt), rc);
1285         } else {
1286                 /* Set task under lock to avoid race with mdt_hsm_cdt_stop() */
1287                 mutex_lock(&cdt->cdt_state_lock);
1288                 cdt->cdt_task = task;
1289                 mutex_unlock(&cdt->cdt_state_lock);
1290                 wait_event(cdt->cdt_waitq, cdt->cdt_state != CDT_INIT);
1291                 CDEBUG(D_HSM, "%s: coordinator thread started\n",
1292                        mdt_obd_name(mdt));
1293                 rc = 0;
1294         }
1295
1296         RETURN(rc);
1297 }
1298
1299 /**
1300  * stop a coordinator thread
1301  * \param mdt [IN] device
1302  */
1303 int mdt_hsm_cdt_stop(struct mdt_device *mdt)
1304 {
1305         struct coordinator *cdt = &mdt->mdt_coordinator;
1306         struct task_struct *task;
1307         int rc;
1308
1309         ENTRY;
1310
1311         /* stop coordinator thread */
1312         rc = set_cdt_state(cdt, CDT_STOPPING);
1313         if (rc)
1314                 RETURN(rc);
1315
1316         /* Get task pointer under lock to avoid race with thread exit */
1317         mutex_lock(&cdt->cdt_state_lock);
1318         task = cdt->cdt_task;
1319         if (task)
1320                 cdt->cdt_task = NULL;
1321         mutex_unlock(&cdt->cdt_state_lock);
1322
1323         /* Only call kthread_stop if we have a valid task */
1324         if (task)
1325                 kthread_stop(task);
1326
1327         rc = wait_event_interruptible(cdt->cdt_waitq,
1328                                       cdt->cdt_state == CDT_STOPPED);
1329         if (rc)
1330                 RETURN(-EINTR);
1331
1332         RETURN(0);
1333 }
1334
1335 static int mdt_hsm_set_exists(struct mdt_thread_info *mti,
1336                               const struct lu_fid *fid,
1337                               u32 archive_id)
1338 {
1339         struct mdt_object *obj;
1340         struct md_hsm mh;
1341         int rc;
1342
1343         obj = mdt_hsm_get_md_hsm(mti, fid, &mh);
1344         if (IS_ERR(obj))
1345                 GOTO(out, rc = PTR_ERR(obj));
1346
1347         if (mh.mh_flags & HS_EXISTS &&
1348             mh.mh_arch_id == archive_id)
1349                 GOTO(out_obj, rc = 0);
1350
1351         mh.mh_flags |= HS_EXISTS;
1352         mh.mh_arch_id = archive_id;
1353         rc = mdt_hsm_attr_set(mti, obj, &mh);
1354
1355 out_obj:
1356         mdt_object_put(mti->mti_env, obj);
1357 out:
1358         return rc;
1359 }
1360
1361 /**
1362  * register all agent requests from a scan phase
1363  * \param mti [IN] context
1364  * \param rq [IN] request
1365  * \param uuid [OUT] in case of CANCEL, the uuid of the agent
1366  *  which is running the CT
1367  * \retval 0 success
1368  * \retval -ve failure
1369  */
1370 int mdt_hsm_add_hsr(struct mdt_thread_info *mti, struct hsm_scan_request *rq,
1371                     struct obd_uuid *uuid)
1372 {
1373         struct mdt_device       *mdt = mti->mti_mdt;
1374         struct coordinator      *cdt = &mdt->mdt_coordinator;
1375         struct cdt_agent_req *car;
1376         struct hsm_mem_req_rec  *hmm;
1377         struct hsm_action_item  *hai;
1378         int                      rc = 0;
1379         ENTRY;
1380
1381         /* register request in memory list */
1382         list_for_each_entry(car, &rq->hsr_cars, car_scan_list) {
1383                 hmm = car->car_hmm;
1384                 if (hmm->mr_rec.arr_status == ARS_FAILED)
1385                         continue;
1386
1387                 hai = &car->car_hai;
1388
1389                 /* in case of a cancel request, we first mark the ondisk
1390                  * record of the request we want to stop as canceled
1391                  * this does not change the cancel record
1392                  * it will be done when updating the request status
1393                  */
1394                 if (hai->hai_action == HSMA_CANCEL) {
1395                         struct cdt_agent_req *orig;
1396                         struct hsm_action_item *h;
1397
1398                         /* find the running request to set it canceled */
1399                         orig = mdt_cdt_find_request(cdt, hai->hai_cookie);
1400                         if (!orig)
1401                                 continue;
1402
1403                         h = &orig->car_hai;
1404                         if (orig->car_cancel) {
1405                                 CDEBUG(D_HSM,
1406                                        "%s: %llx already canceled %s "DFID"\n",
1407                                        mdt_obd_name(mdt), h->hai_cookie,
1408                                        hsm_copytool_action2name(h->hai_action),
1409                                        PFID(&h->hai_fid));
1410                                 mdt_cdt_put_request(orig);
1411                                 continue;
1412                         }
1413                         orig->car_hmm->mr_rec.arr_status = ARS_CANCELED;
1414
1415                         rc = mdt_hsm_agent_modify_record(mti->mti_env, mdt,
1416                                                          orig->car_hmm);
1417
1418                         if (rc) {
1419                                 CERROR("%s: modify record failed, cannot update status to %s for cookie %#llx : rc = %d\n",
1420                                        mdt_obd_name(mdt),
1421                                        agent_req_status2name(ARS_CANCELED),
1422                                        hai->hai_cookie, rc);
1423                                 mdt_cdt_put_request(orig);
1424                                 GOTO(out, rc);
1425                         }
1426
1427                         /* orig holding cancel request orig->car_cancel */
1428                         mdt_cdt_get_request(car);
1429                         /* uuid has to be changed to the one running the
1430                          * request to cancel
1431                          */
1432                         *uuid = orig->car_uuid;
1433                         car->car_uuid = orig->car_uuid;
1434                         orig->car_cancel = car;
1435                         continue;
1436                 }
1437
1438                 if (hai->hai_action == HSMA_ARCHIVE) {
1439                         rc = mdt_hsm_set_exists(mti, &hai->hai_fid,
1440                                                 hsr_get_archive_id(rq));
1441                         if (rc == -ENOENT)
1442                                 continue;
1443                         else if (rc < 0)
1444                                 GOTO(out, rc);
1445                 }
1446
1447                 car->car_uuid = *uuid;
1448                 rc = mdt_cdt_add_request(cdt, car);
1449                 if (rc)
1450                         break;
1451         }
1452 out:
1453         RETURN(rc);
1454 }
1455
1456 /**
1457  * swap layouts between 2 fids
1458  * \param mti [IN] context
1459  * \param obj [IN]
1460  * \param dfid [IN]
1461  * \param mh_common [IN] MD HSM
1462  */
1463 static int hsm_swap_layouts(struct mdt_thread_info *mti,
1464                             struct mdt_object *obj, const struct lu_fid *dfid,
1465                             struct md_hsm *mh_common)
1466 {
1467         struct mdt_object       *dobj;
1468         struct mdt_lock_handle  *dlh;
1469         int                      rc;
1470         ENTRY;
1471
1472         if (!mdt_object_exists(obj))
1473                 GOTO(out, rc = -ENOENT);
1474
1475         /* we already have layout lock on obj so take only
1476          * on dfid */
1477         dlh = &mti->mti_lh[MDT_LH_OLD];
1478         dobj = mdt_object_find_lock(mti, dfid, dlh, MDS_INODELOCK_LAYOUT,
1479                                     LCK_EX);
1480         if (IS_ERR(dobj))
1481                 GOTO(out, rc = PTR_ERR(dobj));
1482
1483         /* if copy tool closes the volatile before sending the final
1484          * progress through llapi_hsm_copy_end(), all the objects
1485          * are removed and mdd_swap_layout LBUG */
1486         if (!mdt_object_exists(dobj)) {
1487                 CERROR("%s: Copytool has closed volatile file "DFID"\n",
1488                        mdt_obd_name(mti->mti_mdt), PFID(dfid));
1489                 GOTO(out_dobj, rc = -ENOENT);
1490         }
1491         /* Since we only handle restores here, unconditionally use
1492          * SWAP_LAYOUTS_MDS_HSM flag to ensure original layout will
1493          * be preserved in case of failure during swap_layout and not
1494          * leave a file in an intermediate but incoherent state.
1495          * But need to setup HSM xattr of data FID before, reuse
1496          * mti and mh presets for FID in hsm_cdt_request_completed(),
1497          * only need to clear RELEASED and DIRTY.
1498          */
1499         mh_common->mh_flags &= ~(HS_RELEASED | HS_DIRTY);
1500         rc = mdt_hsm_attr_set(mti, dobj, mh_common);
1501         if (rc)
1502                 GOTO(out_dobj, rc);
1503
1504         rc = mo_swap_layouts(mti->mti_env, mdt_object_child(obj),
1505                              mdt_object_child(dobj), 0, 0, 0);
1506         if (rc)
1507                 GOTO(out_dobj, rc);
1508
1509         rc = mdt_lsom_downgrade(mti, obj);
1510         if (rc)
1511                 CDEBUG(D_INODE,
1512                        "%s: File fid="DFID" SOM downgrade failed, rc = %d\n",
1513                        mdt_obd_name(mti->mti_mdt),
1514                        PFID(mdt_object_fid(obj)), rc);
1515 out_dobj:
1516         mdt_object_unlock_put(mti, dobj, dlh, 1);
1517 out:
1518         RETURN(rc);
1519 }
1520
1521 /**
1522  * update status of a completed request
1523  * \param mti [IN] context
1524  * \param pgs [IN] progress of the copy tool
1525  * \retval 0 success
1526  * \retval -ve failure
1527  */
1528 static int hsm_cdt_request_completed(struct mdt_thread_info *mti,
1529                                      struct hsm_progress_kernel *pgs,
1530                                      const struct cdt_agent_req *car,
1531                                      enum agent_req_status *status)
1532 {
1533         const struct lu_env *env = mti->mti_env;
1534         struct mdt_device *mdt = mti->mti_mdt;
1535         struct coordinator *cdt = &mdt->mdt_coordinator;
1536         struct mdt_object *obj = NULL;
1537         enum changelog_rec_flags clf_flags = 0;
1538         struct md_hsm mh;
1539         bool is_mh_changed;
1540         bool need_changelog = true;
1541         int rc = 0;
1542
1543         ENTRY;
1544         /* default is to retry */
1545         *status = ARS_WAITING;
1546
1547         /* find object by FID, mdt_hsm_get_md_hsm() returns obj or err
1548          * if error/removed continue anyway to get correct reporting done */
1549         obj = mdt_hsm_get_md_hsm(mti, &car->car_hai.hai_fid, &mh);
1550         /* we will update MD HSM only if needed */
1551         is_mh_changed = false;
1552
1553         /* no need to change mh->mh_arch_id
1554          * mdt_hsm_get_md_hsm() got it from disk and it is still valid
1555          */
1556         if (pgs->hpk_errval != 0) {
1557                 switch (pgs->hpk_errval) {
1558                 case ENOSYS:
1559                         /* the copy tool does not support cancel
1560                          * so the cancel request is failed
1561                          * As we cannot distinguish a cancel progress
1562                          * from another action progress (they have the
1563                          * same cookie), we suppose here the CT returns
1564                          * ENOSYS only if does not support cancel
1565                          */
1566                         /* this can also happen when cdt calls it to
1567                          * for a timed out request */
1568                         *status = ARS_FAILED;
1569                         /* to have a cancel event in changelog */
1570                         pgs->hpk_errval = ECANCELED;
1571                         break;
1572                 case ECANCELED:
1573                         /* the request record has already been set to
1574                          * ARS_CANCELED, this set the cancel request
1575                          * to ARS_SUCCEED */
1576                         *status = ARS_SUCCEED;
1577                         break;
1578                 default:
1579                         /* retry only if current policy or requested, and
1580                          * object is not on error/removed */
1581                         *status = (cdt->cdt_policy & CDT_NORETRY_ACTION ||
1582                                    !(pgs->hpk_flags & HP_FLAG_RETRY) ||
1583                                    IS_ERR(obj)) ? ARS_FAILED : ARS_WAITING;
1584                         break;
1585                 }
1586
1587                 rc = hsm_set_cl_error(&clf_flags, pgs->hpk_errval);
1588                 if (rc == -EOVERFLOW) {
1589                         CERROR("%s: Request %#llx on "DFID" failed, error code %d too large\n",
1590                                mdt_obd_name(mdt), pgs->hpk_cookie,
1591                                PFID(&pgs->hpk_fid), (int)abs(pgs->hpk_errval));
1592                         rc = 0;
1593                 }
1594
1595                 switch (car->car_hai.hai_action) {
1596                 case HSMA_ARCHIVE:
1597                         hsm_set_cl_event(&clf_flags, HE_ARCHIVE);
1598                         break;
1599                 case HSMA_RESTORE:
1600                         hsm_set_cl_event(&clf_flags, HE_RESTORE);
1601                         break;
1602                 case HSMA_REMOVE:
1603                         hsm_set_cl_event(&clf_flags, HE_REMOVE);
1604                         break;
1605                 case HSMA_CANCEL:
1606                         hsm_set_cl_event(&clf_flags, HE_CANCEL);
1607                         CERROR("%s: Failed request %#llx on "DFID
1608                                " cannot be a CANCEL\n",
1609                                mdt_obd_name(mdt),
1610                                pgs->hpk_cookie,
1611                                PFID(&pgs->hpk_fid));
1612                         break;
1613                 default:
1614                         CERROR("%s: Failed request %#llx on "DFID
1615                                " %d is an unknown action\n",
1616                                mdt_obd_name(mdt),
1617                                pgs->hpk_cookie, PFID(&pgs->hpk_fid),
1618                                car->car_hai.hai_action);
1619                         rc = -EINVAL;
1620                         break;
1621                 }
1622         } else {
1623                 *status = ARS_SUCCEED;
1624                 switch (car->car_hai.hai_action) {
1625                 case HSMA_ARCHIVE:
1626                         hsm_set_cl_event(&clf_flags, HE_ARCHIVE);
1627                         /* set ARCHIVE keep EXIST and clear LOST and
1628                          * DIRTY */
1629                         mh.mh_arch_ver = pgs->hpk_data_version;
1630                         mh.mh_flags |= HS_ARCHIVED;
1631                         mh.mh_flags &= ~(HS_LOST|HS_DIRTY);
1632                         is_mh_changed = true;
1633                         break;
1634                 case HSMA_RESTORE:
1635                         hsm_set_cl_event(&clf_flags, HE_RESTORE);
1636
1637                         /* do not clear RELEASED and DIRTY here
1638                          * this will occur in hsm_swap_layouts()
1639                          */
1640
1641                         /* Restoring has changed the file version on
1642                          * disk. */
1643                         mh.mh_arch_ver = pgs->hpk_data_version;
1644                         is_mh_changed = true;
1645                         break;
1646                 case HSMA_REMOVE:
1647                         hsm_set_cl_event(&clf_flags, HE_REMOVE);
1648                         /* clear ARCHIVED EXISTS and LOST */
1649                         mh.mh_flags &= ~(HS_ARCHIVED | HS_EXISTS | HS_LOST);
1650                         is_mh_changed = true;
1651                         break;
1652                 case HSMA_CANCEL:
1653                         hsm_set_cl_event(&clf_flags, HE_CANCEL);
1654                         CERROR("%s: Successful request %#llx on "DFID" cannot be a CANCEL\n",
1655                                mdt_obd_name(mdt),
1656                                pgs->hpk_cookie,
1657                                PFID(&pgs->hpk_fid));
1658                         break;
1659                 default:
1660                         CERROR("%s: Successful request %#llx on "DFID" %d is an unknown action\n",
1661                                mdt_obd_name(mdt),
1662                                pgs->hpk_cookie, PFID(&pgs->hpk_fid),
1663                                car->car_hai.hai_action);
1664                         rc = -EINVAL;
1665                         break;
1666                 }
1667         }
1668
1669         /* rc != 0 means error when analysing action, it may come from
1670          * a crasy CT no need to manage DIRTY
1671          * and if mdt_hsm_get_md_hsm() has returned an error, mh has not been
1672          * filled
1673          */
1674         if (rc == 0 && !IS_ERR(obj))
1675                 hsm_set_cl_flags(&clf_flags,
1676                                  mh.mh_flags & HS_DIRTY ? CLF_HSM_DIRTY : 0);
1677
1678         /* unlock is done later, after layout lock management */
1679         if (is_mh_changed && !IS_ERR(obj))
1680                 rc = mdt_hsm_attr_set(mti, obj, &mh);
1681
1682         /* we give back layout lock only if restore was successful or
1683          * if no retry will be attempted and if object is still alive,
1684          * in other cases we just unlock the object */
1685         if (car->car_hai.hai_action == HSMA_RESTORE) {
1686                 struct mdt_lock_handle *lh;
1687
1688                 /* restore in data FID done, we swap the layouts
1689                  * only if restore is successful */
1690                 if (pgs->hpk_errval == 0 && !IS_ERR(obj)) {
1691                         rc = hsm_swap_layouts(mti, obj, &car->car_hai.hai_dfid,
1692                                               &mh);
1693                         if (rc) {
1694                                 if (cdt->cdt_policy & CDT_NORETRY_ACTION)
1695                                         *status = ARS_FAILED;
1696                                 pgs->hpk_errval = -rc;
1697                                 hsm_set_cl_error(&clf_flags, pgs->hpk_errval);
1698                         }
1699                 }
1700                 /* we have to retry, so keep layout lock */
1701                 if (*status == ARS_WAITING)
1702                         GOTO(out, rc);
1703
1704                 /* restore special case, need to create ChangeLog record
1705                  * before to give back layout lock to avoid concurrent
1706                  * file updater to post out of order ChangeLog */
1707                 mo_changelog(env, CL_HSM, clf_flags, mdt->mdt_child,
1708                              &car->car_hai.hai_fid);
1709                 need_changelog = false;
1710
1711                 cdt_restore_handle_del(mti, cdt, &car->car_hai.hai_fid);
1712                 if (!IS_ERR_OR_NULL(obj)) {
1713                         /* flush UPDATE lock so attributes are upadated */
1714                         lh = &mti->mti_lh[MDT_LH_OLD];
1715                         mdt_object_lock(mti, obj, lh, MDS_INODELOCK_UPDATE,
1716                                         LCK_EX);
1717                         mdt_object_unlock(mti, obj, lh, 1);
1718                 }
1719         }
1720
1721         GOTO(out, rc);
1722
1723 out:
1724         /* always add a ChangeLog record */
1725         if (need_changelog)
1726                 mo_changelog(env, CL_HSM, clf_flags, mdt->mdt_child,
1727                              &car->car_hai.hai_fid);
1728
1729         if (!IS_ERR(obj))
1730                 mdt_object_put(mti->mti_env, obj);
1731
1732         RETURN(rc);
1733 }
1734
1735 /**
1736  * update status of a request
1737  * \param mti [IN] context
1738  * \param pgs [IN] progress of the copy tool
1739  * \retval 0 success
1740  * \retval -ve failure
1741  */
1742 int mdt_hsm_update_request_state(struct mdt_thread_info *mti,
1743                                  struct hsm_progress_kernel *pgs)
1744 {
1745         struct mdt_device       *mdt = mti->mti_mdt;
1746         struct coordinator      *cdt = &mdt->mdt_coordinator;
1747         struct cdt_agent_req    *car;
1748         int                      rc = 0;
1749         ENTRY;
1750
1751         /* no coordinator started, so we cannot serve requests */
1752         if (!cdt_getref_try(cdt))
1753                 RETURN(-EAGAIN);
1754
1755         /* first do sanity checks */
1756         car = mdt_cdt_update_request(cdt, pgs);
1757         if (IS_ERR(car)) {
1758                 CERROR("%s: Cannot find running request for cookie %#llx"
1759                        " on fid="DFID"\n",
1760                        mdt_obd_name(mdt),
1761                        pgs->hpk_cookie, PFID(&pgs->hpk_fid));
1762
1763                 GOTO(putref, rc = PTR_ERR(car));
1764         }
1765
1766         CDEBUG(D_HSM, "Progress received for fid="DFID" cookie=%#llx"
1767                       " action=%s flags=%d err=%d fid="DFID" dfid="DFID"\n",
1768                       PFID(&pgs->hpk_fid), pgs->hpk_cookie,
1769                       hsm_copytool_action2name(car->car_hai.hai_action),
1770                       pgs->hpk_flags, pgs->hpk_errval,
1771                       PFID(&car->car_hai.hai_fid),
1772                       PFID(&car->car_hai.hai_dfid));
1773
1774         /* progress is done on FID or data FID depending of the action and
1775          * of the copy progress */
1776         /* for restore progress is used to send back the data FID to cdt */
1777         if (car->car_hai.hai_action == HSMA_RESTORE &&
1778             lu_fid_eq(&car->car_hai.hai_fid, &car->car_hai.hai_dfid))
1779                 car->car_hai.hai_dfid = pgs->hpk_fid;
1780
1781         if ((car->car_hai.hai_action == HSMA_RESTORE ||
1782              car->car_hai.hai_action == HSMA_ARCHIVE) &&
1783             (!lu_fid_eq(&pgs->hpk_fid, &car->car_hai.hai_dfid) &&
1784              !lu_fid_eq(&pgs->hpk_fid, &car->car_hai.hai_fid))) {
1785                 CERROR("%s: Progress on "DFID" for cookie %#llx"
1786                        " does not match request FID "DFID" nor data FID "
1787                        DFID"\n",
1788                        mdt_obd_name(mdt),
1789                        PFID(&pgs->hpk_fid), pgs->hpk_cookie,
1790                        PFID(&car->car_hai.hai_fid),
1791                        PFID(&car->car_hai.hai_dfid));
1792                 GOTO(out, rc = -EINVAL);
1793         }
1794
1795         if (pgs->hpk_errval != 0 && !(pgs->hpk_flags & HP_FLAG_COMPLETED)) {
1796                 CERROR("%s: Progress on "DFID" for cookie %#llx action=%s"
1797                        " is not coherent (err=%d and not completed"
1798                        " (flags=%d))\n",
1799                        mdt_obd_name(mdt),
1800                        PFID(&pgs->hpk_fid), pgs->hpk_cookie,
1801                        hsm_copytool_action2name(car->car_hai.hai_action),
1802                        pgs->hpk_errval, pgs->hpk_flags);
1803                 GOTO(out, rc = -EINVAL);
1804         }
1805
1806         /* now progress is valid */
1807
1808         /* we use a root like ucred */
1809         hsm_init_ucred(mdt_ucred(mti));
1810
1811         if (pgs->hpk_flags & HP_FLAG_COMPLETED) {
1812                 enum agent_req_status status;
1813                 struct hsm_mem_req_rec *hmm;
1814                 int rc1;
1815
1816                 rc = hsm_cdt_request_completed(mti, pgs, car, &status);
1817
1818                 /* if original record was canceled, need to update cancel rec */
1819                 if (unlikely(car->car_cancel))
1820                         hmm = car->car_cancel->car_hmm;
1821                 else
1822                         hmm = car->car_hmm;
1823
1824                 CDEBUG(D_HSM, "updating record: fid="DFID" cookie=%#llx action=%s status=%s to %s\n",
1825                        PFID(&pgs->hpk_fid), pgs->hpk_cookie,
1826                        hsm_copytool_action2name(hmm->mr_rec.arr_hai.hai_action),
1827                        agent_req_status2name(hmm->mr_rec.arr_status),
1828                        agent_req_status2name(status));
1829
1830                 if (hmm->mr_rec.arr_status == ARS_STARTED ||
1831                     hmm->mr_rec.arr_status == ARS_WAITING) {
1832                         /* update record first (LU-9075) */
1833                         hmm->mr_rec.arr_status = status;
1834
1835                         rc1 = mdt_hsm_agent_modify_record(mti->mti_env, mdt,
1836                                                           hmm);
1837
1838                         if (rc1)
1839                                 CERROR("%s: modify record failed, cannot update status to %s for cookie %#llx: rc = %d\n",
1840                                        mdt_obd_name(mdt),
1841                                        agent_req_status2name(status),
1842                                        pgs->hpk_cookie, rc1);
1843                         rc = (rc != 0 ? rc : rc1);
1844                 }
1845                 /* then remove request from memory list (LU-9075) */
1846                 mdt_cdt_remove_request(cdt, pgs->hpk_cookie);
1847
1848                 /* ct has completed a request, so a slot is available,
1849                  * signal the coordinator to find new work */
1850                 mdt_hsm_cdt_event(cdt);
1851         } else {
1852                 /* if copytool send a progress on a canceled request
1853                  * we inform copytool it should stop
1854                  */
1855                 if (car->car_cancel)
1856                         rc = -ECANCELED;
1857         }
1858         GOTO(out, rc);
1859
1860 out:
1861         /* remove ref got from mdt_cdt_update_request() */
1862         mdt_cdt_put_request(car);
1863
1864 putref:
1865         cdt_putref(cdt);
1866         return rc;
1867 }
1868
1869
1870 /**
1871  *  llog_cat_process() callback, used to:
1872  *  - purge all requests
1873  * \param env [IN] environment
1874  * \param llh [IN] llog handle
1875  * \param hdr [IN] llog record
1876  * \param data [IN] cb data = struct mdt_thread_info
1877  * \retval 0 success
1878  * \retval -ve failure
1879  */
1880 static int mdt_cancel_all_cb(const struct lu_env *env,
1881                              struct llog_handle *llh,
1882                              struct llog_rec_hdr *hdr, void *data)
1883 {
1884         struct llog_agent_req_rec *larr = (struct llog_agent_req_rec *)hdr;
1885         struct hsm_action_item *hai = &larr->arr_hai;
1886         struct mdt_thread_info  *mti = data;
1887         struct coordinator *cdt = &mti->mti_mdt->mdt_coordinator;
1888         int rc;
1889         ENTRY;
1890
1891         CDEBUG(D_TRACE, "%s: HSM record idx %d "DFID" status %s\n",
1892                mdt_obd_name(mti->mti_mdt), hdr->lrh_index,
1893                PFID(&hai->hai_fid),
1894                agent_req_status2name(larr->arr_status));
1895
1896         if (larr->arr_status != ARS_WAITING &&
1897             larr->arr_status != ARS_STARTED)
1898                 RETURN(0);
1899
1900         /* Unlock the EX layout lock */
1901         if (hai->hai_action == HSMA_RESTORE)
1902                 cdt_restore_handle_del(mti, cdt, &hai->hai_fid);
1903
1904         larr->arr_status = ARS_CANCELED;
1905         larr->arr_req_change = ktime_get_real_seconds();
1906         rc = llog_write(env, llh, hdr, hdr->lrh_index);
1907         if (rc < 0) {
1908                 CERROR("%s: cannot update agent log: rc = %d\n",
1909                        mdt_obd_name(mti->mti_mdt), rc);
1910                 rc = LLOG_DEL_RECORD;
1911         }
1912
1913         RETURN(rc);
1914 }
1915
1916 /**
1917  * cancel all actions
1918  * \param obd [IN] MDT device
1919  */
1920 static int hsm_cancel_all_actions(struct mdt_device *mdt)
1921 {
1922         struct lu_env env;
1923         struct lu_context session;
1924         struct mdt_thread_info *mti;
1925         struct coordinator *cdt = &mdt->mdt_coordinator;
1926         struct cdt_agent_req *car;
1927         char fsname[MTI_NAME_MAXLEN];
1928         struct hsm_scan_request rq = {
1929                 .hsr_version = HAL_VERSION,
1930                 .hsr_fsname = &fsname[0],
1931                 .hsr_count = 1,};
1932         enum cdt_states old_state;
1933         int rc;
1934
1935         ENTRY;
1936
1937         rc = lu_env_init(&env, LCT_MD_THREAD);
1938         if (rc < 0)
1939                 RETURN(rc);
1940
1941         /* for mdt_ucred(), lu_ucred stored in lu_ucred_key */
1942         rc = lu_context_init(&session, LCT_SERVER_SESSION);
1943         if (rc < 0)
1944                 GOTO(out_env, rc);
1945
1946         lu_context_enter(&session);
1947         env.le_ses = &session;
1948
1949         mti = lu_context_key_get(&env.le_ctx, &mdt_thread_key);
1950         LASSERT(mti != NULL);
1951
1952         mti->mti_env = &env;
1953         mti->mti_mdt = mdt;
1954
1955         hsm_init_ucred(mdt_ucred(mti));
1956         obd_uuid2fsname(rq.hsr_fsname, mdt_obd_name(mdt), MTI_NAME_MAXLEN);
1957
1958         mutex_lock(&cdt->cdt_state_lock);
1959         old_state = cdt->cdt_state;
1960
1961         /* disable coordinator */
1962         rc = set_cdt_state_locked(cdt, CDT_DISABLE);
1963         if (rc)
1964                 GOTO(out_cdt_state_unlock, rc);
1965
1966         /* waits while coordinator finish work */
1967         if (wait_event_interruptible(cdt->cdt_cancel_all, cdt->cdt_idle))
1968                 GOTO(out_cdt_state, rc = -EINTR);
1969
1970         /* send cancel to all running requests */
1971         down_read(&cdt->cdt_request_lock);
1972         list_for_each_entry(car, &cdt->cdt_request_list, car_request_list) {
1973                 u32 action;
1974
1975                 /* a cdt_agent_req could be at coordinator sending process
1976                  * still mdt_coordinator()->mdt_hsm_agent_send() in case
1977                  * of network resend inside ptlrpc. Skip it.
1978                  */
1979                 if (!list_empty(&car->car_scan_list))
1980                         continue;
1981
1982                 mdt_cdt_get_request(car);
1983                 /* request is not yet removed from list, it will be done
1984                  * when copytool will return progress
1985                  */
1986
1987                 if (car->car_hai.hai_action == HSMA_CANCEL) {
1988                         mdt_cdt_put_request(car);
1989                         continue;
1990                 }
1991
1992                 INIT_LIST_HEAD(&rq.hsr_cars);
1993                 list_add_tail(&car->car_scan_list, &rq.hsr_cars);
1994                 action = car->car_hai.hai_action;
1995                 car->car_hai.hai_action = HSMA_CANCEL;
1996
1997                 /* no conflict with cdt thread because cdt is disable and we
1998                  * have the request lock */
1999                 mdt_hsm_agent_send(mti, &rq, 1);
2000
2001                 car->car_hai.hai_action = action;
2002                 /* Unlock the EX layout lock */
2003                 if (action == HSMA_RESTORE)
2004                         cdt_restore_handle_del(mti, cdt, &car->car_hai.hai_fid);
2005
2006                 mdt_cdt_put_request(car);
2007         }
2008         up_read(&cdt->cdt_request_lock);
2009
2010         /* cancel all on-disk records */
2011         rc = cdt_llog_process(mti->mti_env, mti->mti_mdt, mdt_cancel_all_cb,
2012                               (void *)mti, 0, 0);
2013 out_cdt_state:
2014         /* Enable coordinator, unless the coordinator was stopping. */
2015         set_cdt_state_locked(cdt, old_state);
2016 out_cdt_state_unlock:
2017         mutex_unlock(&cdt->cdt_state_lock);
2018
2019         lu_context_exit(&session);
2020         lu_context_fini(&session);
2021 out_env:
2022         lu_env_fini(&env);
2023
2024         RETURN(rc);
2025 }
2026
2027 /**
2028  * check if a request is compatible with file status
2029  * \param hai [IN] request description
2030  * \param archive_id [IN] request archive id
2031  * \param rq_flags [IN] request flags
2032  * \param hsm [IN] file HSM metadata
2033  * \retval boolean
2034  */
2035 bool mdt_hsm_is_action_compat(const struct hsm_action_item *hai,
2036                               u32 archive_id, u64 rq_flags,
2037                               const struct md_hsm *hsm)
2038 {
2039         int      is_compat = false;
2040         int      hsm_flags;
2041         ENTRY;
2042
2043         hsm_flags = hsm->mh_flags;
2044         switch (hai->hai_action) {
2045         case HSMA_ARCHIVE:
2046                 if (!(hsm_flags & HS_NOARCHIVE) &&
2047                     (hsm_flags & HS_DIRTY || !(hsm_flags & HS_ARCHIVED)))
2048                         is_compat = true;
2049
2050                 if (hsm_flags & HS_EXISTS &&
2051                     archive_id != 0 &&
2052                     archive_id != hsm->mh_arch_id)
2053                         is_compat = false;
2054
2055                 break;
2056         case HSMA_RESTORE:
2057                 if (!(hsm_flags & HS_DIRTY) && (hsm_flags & HS_RELEASED) &&
2058                     hsm_flags & HS_ARCHIVED && !(hsm_flags & HS_LOST))
2059                         is_compat = true;
2060                 break;
2061         case HSMA_REMOVE:
2062                 if (!(hsm_flags & HS_RELEASED) &&
2063                     (hsm_flags & (HS_ARCHIVED | HS_EXISTS)))
2064                         is_compat = true;
2065                 break;
2066         case HSMA_CANCEL:
2067                 is_compat = true;
2068                 break;
2069         }
2070         CDEBUG(D_HSM, "fid="DFID" action=%s flags=%#llx"
2071                       " extent=%#llx-%#llx hsm_flags=%.8X %s\n",
2072                       PFID(&hai->hai_fid),
2073                       hsm_copytool_action2name(hai->hai_action), rq_flags,
2074                       hai->hai_extent.offset, hai->hai_extent.length,
2075                       hsm->mh_flags,
2076                       (is_compat ? "compatible" : "uncompatible"));
2077
2078         RETURN(is_compat);
2079 }
2080
2081 /*
2082  * sysfs interface used to get/set HSM behaviour (cdt->cdt_policy)
2083  */
2084 static const struct {
2085         __u64            bit;
2086         char            *name;
2087         char            *nickname;
2088 } hsm_policy_names[] = {
2089         { CDT_NONBLOCKING_RESTORE,      "NonBlockingRestore",   "NBR"},
2090         { CDT_NORETRY_ACTION,           "NoRetryAction",        "NRA"},
2091         { 0 },
2092 };
2093
2094 /**
2095  * convert a policy name to a bit
2096  * \param name [IN] policy name
2097  * \retval 0 unknown
2098  * \retval   policy bit
2099  */
2100 static __u64 hsm_policy_str2bit(const char *name)
2101 {
2102         int      i;
2103
2104         for (i = 0; hsm_policy_names[i].bit != 0; i++)
2105                 if (strcmp(hsm_policy_names[i].nickname, name) == 0 ||
2106                     strcmp(hsm_policy_names[i].name, name) == 0)
2107                         return hsm_policy_names[i].bit;
2108         return 0;
2109 }
2110
2111 /**
2112  * convert a policy bit field to a string
2113  * \param mask [IN] policy bit field
2114  * \param hexa [IN] print mask before bit names
2115  * \param buffer [OUT] string
2116  * \param count [IN] size of buffer
2117  */
2118 static void hsm_policy_bit2str(struct seq_file *m, const __u64 mask,
2119                                 const bool hexa)
2120 {
2121         int      i, j;
2122         __u64    bit;
2123         ENTRY;
2124
2125         if (hexa)
2126                 seq_printf(m, "(%#llx) ", mask);
2127
2128         for (i = 0; i < CDT_POLICY_SHIFT_COUNT; i++) {
2129                 bit = (1ULL << i);
2130
2131                 for (j = 0; hsm_policy_names[j].bit != 0; j++) {
2132                         if (hsm_policy_names[j].bit == bit)
2133                                 break;
2134                 }
2135                 if (bit & mask)
2136                         seq_printf(m, "[%s] ", hsm_policy_names[j].name);
2137                 else
2138                         seq_printf(m, "%s ", hsm_policy_names[j].name);
2139         }
2140         /* remove last ' ' */
2141         m->count--;
2142         seq_putc(m, '\n');
2143 }
2144
2145 /* methods to read/write HSM policy flags */
2146 static int mdt_hsm_policy_seq_show(struct seq_file *m, void *data)
2147 {
2148         struct mdt_device       *mdt = m->private;
2149         struct coordinator      *cdt = &mdt->mdt_coordinator;
2150         ENTRY;
2151
2152         hsm_policy_bit2str(m, cdt->cdt_policy, false);
2153         RETURN(0);
2154 }
2155
2156 static ssize_t
2157 mdt_hsm_policy_seq_write(struct file *file, const char __user *buffer,
2158                          size_t count, loff_t *off)
2159 {
2160         struct seq_file         *m = file->private_data;
2161         struct mdt_device       *mdt = m->private;
2162         struct coordinator      *cdt = &mdt->mdt_coordinator;
2163         char                    *start, *token, sign;
2164         char                    *buf;
2165         __u64                    policy;
2166         __u64                    add_mask, remove_mask, set_mask;
2167         int                      rc;
2168         ENTRY;
2169
2170         if (count + 1 > PAGE_SIZE)
2171                 RETURN(-EINVAL);
2172
2173         OBD_ALLOC(buf, count + 1);
2174         if (buf == NULL)
2175                 RETURN(-ENOMEM);
2176
2177         if (copy_from_user(buf, buffer, count))
2178                 GOTO(out, rc = -EFAULT);
2179
2180         buf[count] = '\0';
2181
2182         start = buf;
2183         CDEBUG(D_HSM, "%s: receive new policy: '%s'\n", mdt_obd_name(mdt),
2184                start);
2185
2186         add_mask = remove_mask = set_mask = 0;
2187         do {
2188                 token = strsep(&start, "\n ");
2189                 sign = *token;
2190
2191                 if (sign == '\0')
2192                         continue;
2193
2194                 if (sign == '-' || sign == '+')
2195                         token++;
2196
2197                 policy = hsm_policy_str2bit(token);
2198                 if (policy == 0) {
2199                         CWARN("%s: '%s' is unknown, "
2200                               "supported policies are:\n", mdt_obd_name(mdt),
2201                               token);
2202                         hsm_policy_bit2str(m, 0, false);
2203                         GOTO(out, rc = -EINVAL);
2204                 }
2205                 switch (sign) {
2206                 case '-':
2207                         remove_mask |= policy;
2208                         break;
2209                 case '+':
2210                         add_mask |= policy;
2211                         break;
2212                 default:
2213                         set_mask |= policy;
2214                         break;
2215                 }
2216
2217         } while (start != NULL);
2218
2219         CDEBUG(D_HSM, "%s: new policy: rm=%#llx add=%#llx set=%#llx\n",
2220                mdt_obd_name(mdt), remove_mask, add_mask, set_mask);
2221
2222         /* if no sign in all string, it is a clear and set
2223          * if some sign found, all unsigned are converted
2224          * to add
2225          * P1 P2 = set to P1 and P2
2226          * P1 -P2 = add P1 clear P2 same as +P1 -P2
2227          */
2228         if (remove_mask == 0 && add_mask == 0) {
2229                 cdt->cdt_policy = set_mask;
2230         } else {
2231                 cdt->cdt_policy |= set_mask | add_mask;
2232                 cdt->cdt_policy &= ~remove_mask;
2233         }
2234
2235         GOTO(out, rc = count);
2236
2237 out:
2238         OBD_FREE(buf, count + 1);
2239         RETURN(rc);
2240 }
2241 LDEBUGFS_SEQ_FOPS(mdt_hsm_policy);
2242
2243 static ssize_t loop_period_show(struct kobject *kobj, struct attribute *attr,
2244                                 char *buf)
2245 {
2246         struct coordinator *cdt = container_of(kobj, struct coordinator,
2247                                                cdt_hsm_kobj);
2248
2249         return scnprintf(buf, PAGE_SIZE, "%u\n", cdt->cdt_loop_period);
2250 }
2251
2252 static ssize_t loop_period_store(struct kobject *kobj, struct attribute *attr,
2253                                  const char *buffer, size_t count)
2254 {
2255         struct coordinator *cdt = container_of(kobj, struct coordinator,
2256                                                cdt_hsm_kobj);
2257         unsigned int val;
2258         int rc;
2259
2260         rc = kstrtouint(buffer, 0, &val);
2261         if (rc)
2262                 return rc;
2263
2264         if (val != 0)
2265                 cdt->cdt_loop_period = val;
2266
2267         return val ? count : -EINVAL;
2268 }
2269 LUSTRE_RW_ATTR(loop_period);
2270
2271 static ssize_t grace_delay_show(struct kobject *kobj, struct attribute *attr,
2272                                 char *buf)
2273 {
2274         struct coordinator *cdt = container_of(kobj, struct coordinator,
2275                                                cdt_hsm_kobj);
2276
2277         return scnprintf(buf, PAGE_SIZE, "%u\n", cdt->cdt_grace_delay);
2278 }
2279
2280 static ssize_t grace_delay_store(struct kobject *kobj, struct attribute *attr,
2281                                  const char *buffer, size_t count)
2282 {
2283         struct coordinator *cdt = container_of(kobj, struct coordinator,
2284                                                cdt_hsm_kobj);
2285         unsigned int val;
2286         int rc;
2287
2288         rc = kstrtouint(buffer, 0, &val);
2289         if (rc)
2290                 return rc;
2291
2292         if (val != 0)
2293                 cdt->cdt_grace_delay = val;
2294
2295         return val ? count : -EINVAL;
2296 }
2297 LUSTRE_RW_ATTR(grace_delay);
2298
2299 static ssize_t active_request_timeout_show(struct kobject *kobj,
2300                                            struct attribute *attr,
2301                                            char *buf)
2302 {
2303         struct coordinator *cdt = container_of(kobj, struct coordinator,
2304                                                cdt_hsm_kobj);
2305
2306         return scnprintf(buf, PAGE_SIZE, "%d\n", cdt->cdt_active_req_timeout);
2307 }
2308
2309 static ssize_t active_request_timeout_store(struct kobject *kobj,
2310                                             struct attribute *attr,
2311                                             const char *buffer, size_t count)
2312 {
2313         struct coordinator *cdt = container_of(kobj, struct coordinator,
2314                                                cdt_hsm_kobj);
2315         unsigned int val;
2316         int rc;
2317
2318         rc = kstrtouint(buffer, 0, &val);
2319         if (rc)
2320                 return rc;
2321
2322         if (val != 0)
2323                 cdt->cdt_active_req_timeout = val;
2324
2325         return val ? count : -EINVAL;
2326 }
2327 LUSTRE_RW_ATTR(active_request_timeout);
2328
2329 static ssize_t max_requests_show(struct kobject *kobj, struct attribute *attr,
2330                                  char *buf)
2331 {
2332         struct coordinator *cdt = container_of(kobj, struct coordinator,
2333                                                cdt_hsm_kobj);
2334
2335         return scnprintf(buf, PAGE_SIZE, "%llu\n", cdt->cdt_max_requests);
2336 }
2337
2338 static ssize_t max_requests_store(struct kobject *kobj, struct attribute *attr,
2339                                   const char *buffer, size_t count)
2340 {
2341         struct coordinator *cdt = container_of(kobj, struct coordinator,
2342                                                cdt_hsm_kobj);
2343         unsigned long long val;
2344         int rc;
2345
2346         rc = kstrtoull(buffer, 0, &val);
2347         if (rc)
2348                 return rc;
2349         if (!val)
2350                 return -EINVAL;
2351         rc = mdt_hsm_max_requests_update(cdt, val);
2352         if (rc)
2353                 return rc;
2354
2355         return count;
2356 }
2357 LUSTRE_RW_ATTR(max_requests);
2358
2359 static ssize_t default_archive_id_show(struct kobject *kobj,
2360                                        struct attribute *attr, char *buf)
2361 {
2362         struct coordinator *cdt = container_of(kobj, struct coordinator,
2363                                                cdt_hsm_kobj);
2364
2365         return scnprintf(buf, PAGE_SIZE, "%u\n", cdt->cdt_default_archive_id);
2366 }
2367
2368 static ssize_t default_archive_id_store(struct kobject *kobj,
2369                                         struct attribute *attr,
2370                                         const char *buffer, size_t count)
2371 {
2372         struct coordinator *cdt = container_of(kobj, struct coordinator,
2373                                                cdt_hsm_kobj);
2374         unsigned int val;
2375         int rc;
2376
2377         rc = kstrtouint(buffer, 0, &val);
2378         if (rc)
2379                 return rc;
2380
2381         if (val != 0)
2382                 cdt->cdt_default_archive_id = val;
2383
2384         return val ? count : -EINVAL;
2385 }
2386 LUSTRE_RW_ATTR(default_archive_id);
2387
2388 /*
2389  * procfs write method for MDT/hsm_control
2390  * proc entry is in mdt directory so data is mdt obd_device pointer
2391  */
2392 #define CDT_ENABLE_CMD   "enabled"
2393 #define CDT_STOP_CMD     "shutdown"
2394 #define CDT_DISABLE_CMD  "disabled"
2395 #define CDT_PURGE_CMD    "purge"
2396 #define CDT_HELP_CMD     "help"
2397 #define CDT_MAX_CMD_LEN  10
2398
2399 ssize_t hsm_control_store(struct kobject *kobj, struct attribute *attr,
2400                           const char *buffer, size_t count)
2401 {
2402         struct obd_device *obd = container_of(kobj, struct obd_device,
2403                                               obd_kset.kobj);
2404         struct mdt_device *mdt = mdt_dev(obd->obd_lu_dev);
2405         struct coordinator *cdt = &(mdt->mdt_coordinator);
2406         int usage = 0;
2407         int rc = 0;
2408
2409         if (count == 0 || count >= CDT_MAX_CMD_LEN)
2410                 return -EINVAL;
2411
2412         if (strncmp(buffer, CDT_ENABLE_CMD, strlen(CDT_ENABLE_CMD)) == 0) {
2413                 if (cdt->cdt_state == CDT_DISABLE) {
2414                         rc = set_cdt_state(cdt, CDT_RUNNING);
2415                         mdt_hsm_cdt_event(cdt);
2416                         wake_up(&cdt->cdt_waitq);
2417                 } else if (cdt->cdt_state == CDT_RUNNING) {
2418                         rc = 0;
2419                 } else {
2420                         rc = mdt_hsm_cdt_start(mdt);
2421                 }
2422         } else if (strncmp(buffer, CDT_STOP_CMD, strlen(CDT_STOP_CMD)) == 0) {
2423                 if (cdt->cdt_state == CDT_STOPPING) {
2424                         CERROR("%s: Coordinator is already stopping\n",
2425                                mdt_obd_name(mdt));
2426                         rc = -EALREADY;
2427                 } else if (cdt->cdt_state == CDT_STOPPED) {
2428                         rc = 0;
2429                 } else {
2430                         rc = mdt_hsm_cdt_stop(mdt);
2431                 }
2432         } else if (strncmp(buffer, CDT_DISABLE_CMD,
2433                            strlen(CDT_DISABLE_CMD)) == 0) {
2434                 if ((cdt->cdt_state == CDT_STOPPING) ||
2435                     (cdt->cdt_state == CDT_STOPPED)) {
2436                         /* exit gracefully if coordinator is being stopped
2437                          * or stopped already.
2438                          */
2439                         rc = 0;
2440                 } else {
2441                         rc = set_cdt_state(cdt, CDT_DISABLE);
2442                 }
2443         } else if (strncmp(buffer, CDT_PURGE_CMD,
2444                            strlen(CDT_PURGE_CMD)) == 0) {
2445                 rc = hsm_cancel_all_actions(mdt);
2446         } else if (strncmp(buffer, CDT_HELP_CMD,
2447                            strlen(CDT_HELP_CMD)) == 0) {
2448                 usage = 1;
2449         } else {
2450                 usage = 1;
2451                 rc = -EINVAL;
2452         }
2453
2454         if (usage == 1)
2455                 CERROR("%s: Valid coordinator control commands are: "
2456                        "%s %s %s %s %s\n", mdt_obd_name(mdt),
2457                        CDT_ENABLE_CMD, CDT_STOP_CMD, CDT_DISABLE_CMD,
2458                        CDT_PURGE_CMD, CDT_HELP_CMD);
2459
2460         if (rc)
2461                 RETURN(rc);
2462
2463         RETURN(count);
2464 }
2465
2466 ssize_t hsm_control_show(struct kobject *kobj, struct attribute *attr,
2467                          char *buf)
2468 {
2469         struct obd_device *obd = container_of(kobj, struct obd_device,
2470                                               obd_kset.kobj);
2471         struct coordinator *cdt;
2472
2473         cdt = &(mdt_dev(obd->obd_lu_dev)->mdt_coordinator);
2474
2475         return scnprintf(buf, PAGE_SIZE, "%s\n",
2476                          cdt_mdt_state2str(cdt->cdt_state));
2477 }
2478
2479 static int
2480 mdt_hsm_request_mask_show(struct seq_file *m, __u64 mask)
2481 {
2482         bool first = true;
2483         int i;
2484         ENTRY;
2485
2486         for (i = 0; i < 8 * sizeof(mask); i++) {
2487                 if (mask & (1UL << i)) {
2488                         seq_printf(m, "%s%s", first ? "" : " ",
2489                                    hsm_copytool_action2name(i));
2490                         first = false;
2491                 }
2492         }
2493         seq_putc(m, '\n');
2494
2495         RETURN(0);
2496 }
2497
2498 static int
2499 mdt_hsm_user_request_mask_seq_show(struct seq_file *m, void *data)
2500 {
2501         struct mdt_device *mdt = m->private;
2502         struct coordinator *cdt = &mdt->mdt_coordinator;
2503
2504         return mdt_hsm_request_mask_show(m, cdt->cdt_user_request_mask);
2505 }
2506
2507 static int
2508 mdt_hsm_group_request_mask_seq_show(struct seq_file *m, void *data)
2509 {
2510         struct mdt_device *mdt = m->private;
2511         struct coordinator *cdt = &mdt->mdt_coordinator;
2512
2513         return mdt_hsm_request_mask_show(m, cdt->cdt_group_request_mask);
2514 }
2515
2516 static int
2517 mdt_hsm_other_request_mask_seq_show(struct seq_file *m, void *data)
2518 {
2519         struct mdt_device *mdt = m->private;
2520         struct coordinator *cdt = &mdt->mdt_coordinator;
2521
2522         return mdt_hsm_request_mask_show(m, cdt->cdt_other_request_mask);
2523 }
2524
2525 static inline enum hsm_copytool_action
2526 hsm_copytool_name2action(const char *name)
2527 {
2528         if (strcasecmp(name, "NOOP") == 0)
2529                 return HSMA_NONE;
2530         else if (strcasecmp(name, "ARCHIVE") == 0)
2531                 return HSMA_ARCHIVE;
2532         else if (strcasecmp(name, "RESTORE") == 0)
2533                 return HSMA_RESTORE;
2534         else if (strcasecmp(name, "REMOVE") == 0)
2535                 return HSMA_REMOVE;
2536         else if (strcasecmp(name, "CANCEL") == 0)
2537                 return HSMA_CANCEL;
2538         else
2539                 return -1;
2540 }
2541
2542 static ssize_t
2543 mdt_write_hsm_request_mask(struct file *file, const char __user *user_buf,
2544                             size_t user_count, __u64 *mask)
2545 {
2546         char *buf, *pos, *name;
2547         size_t buf_size;
2548         __u64 new_mask = 0;
2549         int rc;
2550         ENTRY;
2551
2552         if (!(user_count < 4096))
2553                 RETURN(-ENOMEM);
2554
2555         buf_size = user_count + 1;
2556
2557         OBD_ALLOC(buf, buf_size);
2558         if (buf == NULL)
2559                 RETURN(-ENOMEM);
2560
2561         if (copy_from_user(buf, user_buf, buf_size - 1))
2562                 GOTO(out, rc = -EFAULT);
2563
2564         buf[buf_size - 1] = '\0';
2565
2566         pos = buf;
2567         while ((name = strsep(&pos, " \t\v\n")) != NULL) {
2568                 int action;
2569
2570                 if (*name == '\0')
2571                         continue;
2572
2573                 action = hsm_copytool_name2action(name);
2574                 if (action < 0)
2575                         GOTO(out, rc = -EINVAL);
2576
2577                 new_mask |= (1UL << action);
2578         }
2579
2580         *mask = new_mask;
2581         rc = user_count;
2582 out:
2583         OBD_FREE(buf, buf_size);
2584
2585         RETURN(rc);
2586 }
2587
2588 static ssize_t
2589 mdt_hsm_user_request_mask_seq_write(struct file *file, const char __user *buf,
2590                                         size_t count, loff_t *off)
2591 {
2592         struct seq_file         *m = file->private_data;
2593         struct mdt_device       *mdt = m->private;
2594         struct coordinator *cdt = &mdt->mdt_coordinator;
2595
2596         return mdt_write_hsm_request_mask(file, buf, count,
2597                                            &cdt->cdt_user_request_mask);
2598 }
2599
2600 static ssize_t
2601 mdt_hsm_group_request_mask_seq_write(struct file *file, const char __user *buf,
2602                                         size_t count, loff_t *off)
2603 {
2604         struct seq_file         *m = file->private_data;
2605         struct mdt_device       *mdt = m->private;
2606         struct coordinator      *cdt = &mdt->mdt_coordinator;
2607
2608         return mdt_write_hsm_request_mask(file, buf, count,
2609                                            &cdt->cdt_group_request_mask);
2610 }
2611
2612 static ssize_t
2613 mdt_hsm_other_request_mask_seq_write(struct file *file, const char __user *buf,
2614                                         size_t count, loff_t *off)
2615 {
2616         struct seq_file         *m = file->private_data;
2617         struct mdt_device       *mdt = m->private;
2618         struct coordinator      *cdt = &mdt->mdt_coordinator;
2619
2620         return mdt_write_hsm_request_mask(file, buf, count,
2621                                            &cdt->cdt_other_request_mask);
2622 }
2623
2624 static ssize_t remove_archive_on_last_unlink_show(struct kobject *kobj,
2625                                                   struct attribute *attr,
2626                                                   char *buf)
2627 {
2628         struct coordinator *cdt = container_of(kobj, struct coordinator,
2629                                                cdt_hsm_kobj);
2630
2631         return scnprintf(buf, PAGE_SIZE, "%u\n",
2632                          cdt->cdt_remove_archive_on_last_unlink);
2633 }
2634
2635 static ssize_t remove_archive_on_last_unlink_store(struct kobject *kobj,
2636                                                    struct attribute *attr,
2637                                                    const char *buffer,
2638                                                    size_t count)
2639 {
2640         struct coordinator *cdt = container_of(kobj, struct coordinator,
2641                                                cdt_hsm_kobj);
2642         bool val;
2643         int rc;
2644
2645         rc = kstrtobool(buffer, &val);
2646         if (rc < 0)
2647                 return rc;
2648
2649         cdt->cdt_remove_archive_on_last_unlink = val;
2650         return count;
2651 }
2652 LUSTRE_RW_ATTR(remove_archive_on_last_unlink);
2653
2654 LDEBUGFS_SEQ_FOPS(mdt_hsm_user_request_mask);
2655 LDEBUGFS_SEQ_FOPS(mdt_hsm_group_request_mask);
2656 LDEBUGFS_SEQ_FOPS(mdt_hsm_other_request_mask);
2657
2658 /* Read-only sysfs files for request counters */
2659 static ssize_t archive_count_show(struct kobject *kobj, struct attribute *attr,
2660                                   char *buf)
2661 {
2662         struct coordinator *cdt = container_of(kobj, struct coordinator,
2663                                                cdt_hsm_kobj);
2664
2665         return scnprintf(buf, PAGE_SIZE, "%d\n",
2666                          atomic_read(&cdt->cdt_archive_count));
2667 }
2668 LUSTRE_RO_ATTR(archive_count);
2669
2670 static ssize_t restore_count_show(struct kobject *kobj, struct attribute *attr,
2671                                   char *buf)
2672 {
2673         struct coordinator *cdt = container_of(kobj, struct coordinator,
2674                                                cdt_hsm_kobj);
2675
2676         return scnprintf(buf, PAGE_SIZE, "%d\n",
2677                          atomic_read(&cdt->cdt_restore_count));
2678 }
2679 LUSTRE_RO_ATTR(restore_count);
2680
2681 static ssize_t remove_count_show(struct kobject *kobj, struct attribute *attr,
2682                                  char *buf)
2683 {
2684         struct coordinator *cdt = container_of(kobj, struct coordinator,
2685                                                cdt_hsm_kobj);
2686
2687         return scnprintf(buf, PAGE_SIZE, "%d\n",
2688                          atomic_read(&cdt->cdt_remove_count));
2689 }
2690 LUSTRE_RO_ATTR(remove_count);
2691
2692 static struct ldebugfs_vars ldebugfs_mdt_hsm_vars[] = {
2693         { .name =       "agents",
2694           .fops =       &mdt_hsm_agent_fops                     },
2695         { .name =       "actions",
2696           .fops =       &mdt_hsm_actions_fops,
2697           .proc_mode =  0444                                    },
2698         { .name =       "policy",
2699           .fops =       &mdt_hsm_policy_fops                    },
2700         { .name =       "active_requests",
2701           .fops =       &mdt_hsm_active_requests_fops           },
2702         { .name =       "user_request_mask",
2703           .fops =       &mdt_hsm_user_request_mask_fops,        },
2704         { .name =       "group_request_mask",
2705           .fops =       &mdt_hsm_group_request_mask_fops,       },
2706         { .name =       "other_request_mask",
2707           .fops =       &mdt_hsm_other_request_mask_fops,       },
2708         { 0 }
2709 };
2710
2711 static struct attribute *hsm_attrs[] = {
2712         &lustre_attr_loop_period.attr,
2713         &lustre_attr_grace_delay.attr,
2714         &lustre_attr_active_request_timeout.attr,
2715         &lustre_attr_max_requests.attr,
2716         &lustre_attr_default_archive_id.attr,
2717         &lustre_attr_remove_archive_on_last_unlink.attr,
2718         &lustre_attr_archive_count.attr,
2719         &lustre_attr_restore_count.attr,
2720         &lustre_attr_remove_count.attr,
2721         NULL,
2722 };
2723
2724 KOBJ_ATTRIBUTE_GROUPS(hsm); /* creates hsm_groups from hsm_attrs */
2725
2726 static void hsm_kobj_release(struct kobject *kobj)
2727 {
2728         struct coordinator *cdt = container_of(kobj, struct coordinator,
2729                                                cdt_hsm_kobj);
2730
2731         debugfs_remove_recursive(cdt->cdt_debugfs_dir);
2732         cdt->cdt_debugfs_dir = NULL;
2733
2734         complete(&cdt->cdt_kobj_unregister);
2735 }
2736
2737 static struct kobj_type hsm_ktype = {
2738         .default_groups = KOBJ_ATTR_GROUPS(hsm),
2739         .sysfs_ops      = &lustre_sysfs_ops,
2740         .release        = hsm_kobj_release,
2741 };
2742
2743 /**
2744  * create sysfs entries for coordinator
2745  * \param mdt [IN]
2746  * \retval 0 success
2747  * \retval -ve failure
2748  */
2749 int hsm_cdt_tunables_init(struct mdt_device *mdt)
2750 {
2751         struct coordinator *cdt = &mdt->mdt_coordinator;
2752         struct obd_device *obd = mdt2obd_dev(mdt);
2753         int rc;
2754
2755         init_completion(&cdt->cdt_kobj_unregister);
2756         rc = kobject_init_and_add(&cdt->cdt_hsm_kobj, &hsm_ktype,
2757                                   &obd->obd_kset.kobj, "%s", "hsm");
2758         if (rc) {
2759                 kobject_put(&cdt->cdt_hsm_kobj);
2760                 return rc;
2761         }
2762
2763         /* init debugfs entries, failure is not critical */
2764         cdt->cdt_debugfs_dir = debugfs_create_dir("hsm",
2765                                                   obd->obd_debugfs_entry);
2766         ldebugfs_add_vars(cdt->cdt_debugfs_dir, ldebugfs_mdt_hsm_vars, mdt);
2767
2768         return 0;
2769 }
2770
2771 /**
2772  * remove sysfs entries for coordinator
2773  *
2774  * @mdt
2775  */
2776 void hsm_cdt_tunables_fini(struct mdt_device *mdt)
2777 {
2778         struct coordinator *cdt = &mdt->mdt_coordinator;
2779
2780         kobject_put(&cdt->cdt_hsm_kobj);
2781         wait_for_completion(&cdt->cdt_kobj_unregister);
2782 }