+static int mdt_cdt_waiting_cb(const struct lu_env *env,
+ struct mdt_device *mdt,
+ struct llog_handle *llh,
+ struct llog_agent_req_rec *larr,
+ struct hsm_scan_data *hsd)
+{
+ struct coordinator *cdt = &mdt->mdt_coordinator;
+ struct hsm_scan_request *request;
+ struct hsm_action_item *hai;
+ size_t hai_size;
+ u32 archive_id;
+ bool wrapped;
+ int i;
+
+ /* Are agents full? */
+ if (atomic_read(&cdt->cdt_request_count) >= cdt->cdt_max_requests)
+ RETURN(hsd->hsd_housekeeping ? 0 : LLOG_PROC_BREAK);
+
+ if (hsd->hsd_action_count + atomic_read(&cdt->cdt_request_count) >=
+ cdt->cdt_max_requests) {
+ /* We cannot send any more request
+ *
+ * *** SPECIAL CASE ***
+ *
+ * Restore requests are too important not to schedule at least
+ * one, everytime we can.
+ */
+ if (larr->arr_hai.hai_action != HSMA_RESTORE ||
+ hsd->hsd_one_restore)
+ RETURN(hsd->hsd_housekeeping ? 0 : LLOG_PROC_BREAK);
+ }
+
+ hai_size = cfs_size_round(larr->arr_hai.hai_len);
+ archive_id = larr->arr_archive_id;
+
+ /* Can we add this action to one of the existing HALs in hsd. */
+ request = NULL;
+ for (i = 0; i < hsd->hsd_request_count; i++) {
+ if (hsd->hsd_request[i].hal->hal_archive_id == archive_id &&
+ hsd->hsd_request[i].hal_used_sz + hai_size <=
+ LDLM_MAXREQSIZE) {
+ request = &hsd->hsd_request[i];
+ break;
+ }
+ }
+
+ /* Are we trying to force-schedule a request? */
+ if (hsd->hsd_action_count + atomic_read(&cdt->cdt_request_count) >=
+ cdt->cdt_max_requests) {
+ /* Is there really no compatible hsm_scan_request? */
+ if (!request) {
+ for (i -= 1; i >= 0; i--) {
+ if (hsd->hsd_request[i].hal->hal_archive_id ==
+ archive_id) {
+ request = &hsd->hsd_request[i];
+ break;
+ }
+ }
+ }
+
+ /* Make room for the hai */
+ if (request) {
+ /* Discard the last hai until there is enough space */
+ do {
+ request->hal->hal_count--;
+
+ hai = hai_first(request->hal);
+ for (i = 0; i < request->hal->hal_count; i++)
+ hai = hai_next(hai);
+ request->hal_used_sz -=
+ cfs_size_round(hai->hai_len);
+ hsd->hsd_action_count--;
+ } while (request->hal_used_sz + hai_size >
+ LDLM_MAXREQSIZE);
+ } else if (hsd->hsd_housekeeping) {
+ struct hsm_scan_request *tmp;
+
+ /* Discard the (whole) last hal */
+ hsd->hsd_request_count--;
+ LASSERT(hsd->hsd_request_count >= 0);
+ tmp = &hsd->hsd_request[hsd->hsd_request_count];
+ hsd->hsd_action_count -= tmp->hal->hal_count;
+ LASSERT(hsd->hsd_action_count >= 0);
+ OBD_FREE(tmp->hal, tmp->hal_sz);
+ } else {
+ /* Bailing out, this code path is too hot */
+ RETURN(LLOG_PROC_BREAK);
+
+ }
+ }
+
+ if (!request) {
+ struct hsm_action_list *hal;
+
+ LASSERT(hsd->hsd_request_count < hsd->hsd_request_len);
+ request = &hsd->hsd_request[hsd->hsd_request_count];
+
+ /* allocates hai vector size just needs to be large
+ * enough */
+ request->hal_sz = sizeof(*request->hal) +
+ cfs_size_round(MTI_NAME_MAXLEN + 1) + 2 * hai_size;
+ OBD_ALLOC_LARGE(hal, request->hal_sz);
+ if (!hal)
+ RETURN(-ENOMEM);
+
+ hal->hal_version = HAL_VERSION;
+ strlcpy(hal->hal_fsname, hsd->hsd_fsname, MTI_NAME_MAXLEN + 1);
+ hal->hal_archive_id = larr->arr_archive_id;
+ hal->hal_flags = larr->arr_flags;
+ hal->hal_count = 0;
+ request->hal_used_sz = hal_size(hal);
+ request->hal = hal;
+ hsd->hsd_request_count++;
+ } else if (request->hal_sz < request->hal_used_sz + hai_size) {
+ /* Not enough room, need an extension */
+ void *hal_buffer;
+ int sz;
+
+ sz = min_t(int, 2 * request->hal_sz, LDLM_MAXREQSIZE);
+ LASSERT(request->hal_used_sz + hai_size < sz);
+
+ OBD_ALLOC_LARGE(hal_buffer, sz);
+ if (!hal_buffer)
+ RETURN(-ENOMEM);
+
+ memcpy(hal_buffer, request->hal, request->hal_used_sz);
+ OBD_FREE_LARGE(request->hal, request->hal_sz);
+ request->hal = hal_buffer;
+ request->hal_sz = sz;
+ }
+
+ hai = hai_first(request->hal);
+ for (i = 0; i < request->hal->hal_count; i++)
+ hai = hai_next(hai);
+
+ memcpy(hai, &larr->arr_hai, larr->arr_hai.hai_len);
+
+ request->hal_used_sz += hai_size;
+ request->hal->hal_count++;
+
+ hsd->hsd_action_count++;
+
+ switch (hai->hai_action) {
+ case HSMA_CANCEL:
+ break;
+ case HSMA_RESTORE:
+ hsd->hsd_one_restore = true;
+ fallthrough;
+ default:
+ cdt_agent_record_hash_add(cdt, hai->hai_cookie,
+ llh->lgh_hdr->llh_cat_idx,
+ larr->arr_hdr.lrh_index);
+ }
+
+ wrapped = llh->lgh_hdr->llh_cat_idx >= llh->lgh_last_idx &&
+ llh->lgh_hdr->llh_count > 1;
+ if ((!wrapped && llh->lgh_hdr->llh_cat_idx > hsd->hsd_start_cat_idx) ||
+ (wrapped && llh->lgh_hdr->llh_cat_idx < hsd->hsd_start_cat_idx) ||
+ (llh->lgh_hdr->llh_cat_idx == hsd->hsd_start_cat_idx &&
+ larr->arr_hdr.lrh_index > hsd->hsd_start_rec_idx)) {
+ hsd->hsd_start_cat_idx = llh->lgh_hdr->llh_cat_idx;
+ hsd->hsd_start_rec_idx = larr->arr_hdr.lrh_index;
+ }
+
+ RETURN(0);
+}
+
+static int mdt_cdt_started_cb(const struct lu_env *env,
+ struct mdt_device *mdt,
+ struct llog_handle *llh,
+ struct llog_agent_req_rec *larr,
+ struct hsm_scan_data *hsd)
+{
+ struct coordinator *cdt = &mdt->mdt_coordinator;
+ struct hsm_action_item *hai = &larr->arr_hai;
+ struct cdt_agent_req *car;
+ time64_t now = ktime_get_real_seconds();
+ time64_t last;
+ enum changelog_rec_flags clf_flags;
+ int rc;
+
+ if (!hsd->hsd_housekeeping)
+ RETURN(0);
+
+ /* we search for a running request
+ * error may happen if coordinator crashes or stopped
+ * with running request
+ */
+ car = mdt_cdt_find_request(cdt, hai->hai_cookie);
+ if (car == NULL) {
+ last = larr->arr_req_change;
+ } else {
+ last = car->car_req_update;
+ }
+
+ /* test if request too long, if yes cancel it
+ * the same way the copy tool acknowledge a cancel request */
+ if (now <= last + cdt->cdt_active_req_timeout)
+ GOTO(out_car, rc = 0);
+
+ dump_llog_agent_req_rec("request timed out, start cleaning", larr);
+
+ if (car != NULL) {
+ car->car_req_update = now;
+ mdt_hsm_agent_update_statistics(cdt, 0, 1, 0, &car->car_uuid);
+ /* Remove car from memory list (LU-9075) */
+ mdt_cdt_remove_request(cdt, hai->hai_cookie);
+ }
+
+ /* Emit a changelog record for the failed action.*/
+ clf_flags = 0;
+ hsm_set_cl_error(&clf_flags, ECANCELED);
+
+ switch (hai->hai_action) {
+ case HSMA_ARCHIVE:
+ hsm_set_cl_event(&clf_flags, HE_ARCHIVE);
+ break;
+ case HSMA_RESTORE:
+ hsm_set_cl_event(&clf_flags, HE_RESTORE);
+ break;
+ case HSMA_REMOVE:
+ hsm_set_cl_event(&clf_flags, HE_REMOVE);
+ break;
+ case HSMA_CANCEL:
+ hsm_set_cl_event(&clf_flags, HE_CANCEL);
+ break;
+ default:
+ /* Unknown record type, skip changelog. */
+ clf_flags = 0;
+ break;
+ }
+
+ if (clf_flags != 0)
+ mo_changelog(env, CL_HSM, clf_flags, mdt->mdt_child,
+ &hai->hai_fid);
+
+ if (hai->hai_action == HSMA_RESTORE)
+ cdt_restore_handle_del(hsd->hsd_mti, cdt, &hai->hai_fid);
+
+ larr->arr_status = ARS_CANCELED;
+ larr->arr_req_change = now;
+ rc = llog_write(hsd->hsd_mti->mti_env, llh, &larr->arr_hdr,
+ larr->arr_hdr.lrh_index);
+ if (rc < 0) {
+ CERROR("%s: cannot update agent log: rc = %d\n",
+ mdt_obd_name(mdt), rc);
+ rc = LLOG_DEL_RECORD;
+ }
+
+ /* ct has completed a request, so a slot is available,
+ * signal the coordinator to find new work */
+ mdt_hsm_cdt_event(cdt);
+out_car:
+ if (car != NULL)
+ mdt_cdt_put_request(car);
+
+ RETURN(rc);
+}
+