Whamcloud - gitweb
LU-3709 mdt: CDT cleanup follow-on patch
[fs/lustre-release.git] / lustre / mdt / mdt_hsm_cdt_agent.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * (C) Copyright 2012 Commissariat a l'energie atomique et aux energies
24  *     alternatives
25  *
26  */
27 /*
28  * lustre/mdt/mdt_hsm_cdt_agent.c
29  *
30  * Lustre HSM Coordinator
31  *
32  * Author: Jacques-Charles Lafoucriere <jacques-charles.lafoucriere@cea.fr>
33  * Author: Aurelien Degremont <aurelien.degremont@cea.fr>
34  */
35
36 #define DEBUG_SUBSYSTEM S_MDS
37
38 #include <obd.h>
39 #include <obd_support.h>
40 #include <lustre_export.h>
41 #include <lustre/lustre_user.h>
42 #include <lprocfs_status.h>
43 #include "mdt_internal.h"
44
45 /*
46  * Agent external API
47  */
48
49 /*
50  * find a hsm_agent by uuid
51  * lock cdt_agent_lock needs to be hold by caller
52  * \param cdt [IN] coordinator
53  * \param uuid [IN] agent UUID
54  * \retval hsm_agent pointer or NULL if not found
55  */
56 static struct hsm_agent *mdt_hsm_agent_lookup(struct coordinator *cdt,
57                                               const struct obd_uuid *uuid)
58 {
59         struct hsm_agent        *ha;
60
61         list_for_each_entry(ha, &cdt->cdt_agents, ha_list) {
62                 if (obd_uuid_equals(&ha->ha_uuid, uuid))
63                         return ha;
64         }
65         return NULL;
66 }
67
68 /**
69  * register a copy tool
70  * \param mti [IN] MDT context
71  * \param uuid [IN] client UUID to be registered
72  * \param count [IN] number of archives agent serves
73  * \param archive_id [IN] vector of archive number served by the copytool
74  * \retval 0 success
75  * \retval -ve failure
76  */
77 int mdt_hsm_agent_register(struct mdt_thread_info *mti,
78                            const struct obd_uuid *uuid,
79                            int nr_archives, __u32 *archive_id)
80 {
81         struct coordinator      *cdt = &mti->mti_mdt->mdt_coordinator;
82         struct hsm_agent        *ha, *tmp;
83         int                      rc;
84         ENTRY;
85
86         /* no coordinator started, so we cannot serve requests */
87         if (cdt->cdt_state == CDT_STOPPED) {
88                 LCONSOLE_WARN("HSM coordinator thread is not running - "
89                               "denying agent registration.\n");
90                 RETURN(-ENXIO);
91         }
92
93         OBD_ALLOC_PTR(ha);
94         if (ha == NULL)
95                 GOTO(out, rc = -ENOMEM);
96
97         ha->ha_uuid = *uuid;
98         ha->ha_archive_cnt = nr_archives;
99         if (ha->ha_archive_cnt != 0) {
100                 int sz;
101
102                 sz = ha->ha_archive_cnt * sizeof(*ha->ha_archive_id);
103                 OBD_ALLOC(ha->ha_archive_id, sz);
104                 if (ha->ha_archive_id == NULL)
105                         GOTO(out_free, rc = -ENOMEM);
106                 memcpy(ha->ha_archive_id, archive_id, sz);
107         }
108         atomic_set(&ha->ha_requests, 0);
109         atomic_set(&ha->ha_success, 0);
110         atomic_set(&ha->ha_failure, 0);
111
112         down_write(&cdt->cdt_agent_lock);
113         tmp = mdt_hsm_agent_lookup(cdt, uuid);
114         if (tmp != NULL) {
115                 LCONSOLE_WARN("HSM agent %s already registered\n",
116                               obd_uuid2str(uuid));
117                 up_write(&cdt->cdt_agent_lock);
118                 GOTO(out_free, rc = -EEXIST);
119         }
120
121         list_add_tail(&ha->ha_list, &cdt->cdt_agents);
122
123         if (ha->ha_archive_cnt == 0)
124                 CDEBUG(D_HSM, "agent %s registered for all archives\n",
125                        obd_uuid2str(&ha->ha_uuid));
126         else
127                 CDEBUG(D_HSM, "agent %s registered for %d archives\n",
128                        obd_uuid2str(&ha->ha_uuid), ha->ha_archive_cnt);
129
130         up_write(&cdt->cdt_agent_lock);
131         GOTO(out, rc = 0);
132
133 out_free:
134
135         if (ha != NULL && ha->ha_archive_id != NULL)
136                 OBD_FREE(ha->ha_archive_id,
137                          ha->ha_archive_cnt * sizeof(*ha->ha_archive_id));
138         if (ha != NULL)
139                 OBD_FREE_PTR(ha);
140 out:
141         return rc;
142 }
143
144 /**
145  * register a copy tool
146  * \param mti [IN] MDT context
147  * \param uuid [IN] uuid to be registered
148  * \param archive_mask [IN] bitmask of archive number served by the copytool
149  * \retval 0 success
150  * \retval -ve failure
151  */
152 int mdt_hsm_agent_register_mask(struct mdt_thread_info *mti,
153                                 const struct obd_uuid *uuid, __u32 archive_mask)
154 {
155         int              rc, i, nr_archives = 0;
156         __u32           *archive_id = NULL;
157         ENTRY;
158
159         nr_archives = hweight32(archive_mask);
160
161         if (nr_archives != 0) {
162                 OBD_ALLOC(archive_id, nr_archives * sizeof(*archive_id));
163                 if (!archive_id)
164                         RETURN(-ENOMEM);
165
166                 nr_archives = 0;
167                 for (i = 0; i < sizeof(archive_mask) * 8; i++) {
168                         if ((1 << i) & archive_mask) {
169                                 archive_id[nr_archives] = i + 1;
170                                 nr_archives++;
171                         }
172                 }
173         }
174
175         rc = mdt_hsm_agent_register(mti, uuid, nr_archives, archive_id);
176
177         if (archive_id != NULL)
178                 OBD_FREE(archive_id, nr_archives * sizeof(*archive_id));
179
180         RETURN(rc);
181 }
182
183 /**
184  * unregister a copy tool
185  * \param mti [IN] MDT context
186  * \param uuid [IN] uuid to be unregistered
187  * \retval 0 success
188  * \retval -ve failure
189  */
190 int mdt_hsm_agent_unregister(struct mdt_thread_info *mti,
191                              const struct obd_uuid *uuid)
192 {
193         struct coordinator      *cdt = &mti->mti_mdt->mdt_coordinator;
194         struct hsm_agent        *ha;
195         int                      rc;
196         ENTRY;
197
198         /* no coordinator started, so we cannot serve requests */
199         if (cdt->cdt_state == CDT_STOPPED)
200                 RETURN(-ENXIO);
201
202         down_write(&cdt->cdt_agent_lock);
203
204         ha = mdt_hsm_agent_lookup(cdt, uuid);
205         if (ha != NULL)
206                 list_del_init(&ha->ha_list);
207
208         up_write(&cdt->cdt_agent_lock);
209
210         if (ha == NULL)
211                 GOTO(out, rc = -ENOENT);
212
213         if (ha->ha_archive_cnt != 0)
214                 OBD_FREE(ha->ha_archive_id,
215                          ha->ha_archive_cnt * sizeof(*ha->ha_archive_id));
216         OBD_FREE_PTR(ha);
217
218         GOTO(out, rc = 0);
219 out:
220         CDEBUG(D_HSM, "agent %s unregistration: %d\n", obd_uuid2str(uuid), rc);
221
222         return rc;
223 }
224
225 /**
226  * update agent statistics
227  * \param mdt [IN] MDT device
228  * \param succ_rq [IN] number of success
229  * \param fail_rq [IN] number of failure
230  * \param new_rq [IN] number of new requests
231  * \param uuid [IN] agent uuid
232  * if all counters == 0, clear counters
233  * \retval 0 success
234  * \retval -ve failure
235  */
236 int mdt_hsm_agent_update_statistics(struct coordinator *cdt,
237                                     int succ_rq, int fail_rq, int new_rq,
238                                     const struct obd_uuid *uuid)
239 {
240         struct hsm_agent        *ha;
241         int                      rc;
242         ENTRY;
243
244         down_read(&cdt->cdt_agent_lock);
245         list_for_each_entry(ha, &cdt->cdt_agents, ha_list) {
246                 if (obd_uuid_equals(&ha->ha_uuid, uuid)) {
247                         if (succ_rq == 0 && fail_rq == 0 && new_rq == 0) {
248                                 atomic_set(&ha->ha_success, 0);
249                                 atomic_set(&ha->ha_failure, 0);
250                                 atomic_set(&ha->ha_requests, 0);
251                         } else {
252                                 atomic_add(succ_rq, &ha->ha_success);
253                                 atomic_add(fail_rq, &ha->ha_failure);
254                                 atomic_add(new_rq, &ha->ha_requests);
255                                 atomic_sub(succ_rq, &ha->ha_requests);
256                                 atomic_sub(fail_rq, &ha->ha_requests);
257                         }
258                         GOTO(out, rc = 0);
259                 }
260
261         }
262         rc = -ENOENT;
263 out:
264         up_read(&cdt->cdt_agent_lock);
265         RETURN(rc);
266 }
267
268 /**
269  * find the best agent
270  * \param cdt [IN] coordinator
271  * \param archive [IN] archive number
272  * \param uuid [OUT] agent who can serve archive
273  * \retval 0 success
274  * \retval -ve failure
275  */
276 int mdt_hsm_find_best_agent(struct coordinator *cdt, __u32 archive,
277                             struct obd_uuid *uuid)
278 {
279         int                      rc = -EAGAIN, i, load = -1;
280         struct hsm_agent        *ha;
281         ENTRY;
282
283         /* Choose an export to send a copytool req to */
284         down_read(&cdt->cdt_agent_lock);
285         list_for_each_entry(ha, &cdt->cdt_agents, ha_list) {
286                 for (i = 0; (i < ha->ha_archive_cnt) &&
287                               (ha->ha_archive_id[i] != archive); i++) {
288                         /* nothing to do, just skip unmatching records */
289                 }
290
291                 /* archive count == 0 means copy tool serves any backend */
292                 if (ha->ha_archive_cnt != 0 && i == ha->ha_archive_cnt)
293                         continue;
294
295                 if (load == -1 || load > atomic_read(&ha->ha_requests)) {
296                         load = atomic_read(&ha->ha_requests);
297                         *uuid = ha->ha_uuid;
298                         rc = 0;
299                 }
300                 if (atomic_read(&ha->ha_requests) == 0)
301                         break;
302         }
303         up_read(&cdt->cdt_agent_lock);
304
305         RETURN(rc);
306 }
307
308 /**
309  * send a compound request to the agent
310  * \param mti [IN] context
311  * \param hal [IN] request (can be a kuc payload)
312  * \param purge [IN] purge mode (no record)
313  * \retval 0 success
314  * \retval -ve failure
315  * This function supposes:
316  *  - all actions are for the same archive number
317  *  - in case of cancel, all cancel are for the same agent
318  * This implies that request split has to be done
319  *  before when building the hal
320  */
321 int mdt_hsm_agent_send(struct mdt_thread_info *mti,
322                        struct hsm_action_list *hal, bool purge)
323 {
324         struct obd_export       *exp;
325         struct mdt_device       *mdt = mti->mti_mdt;
326         struct coordinator      *cdt = &mti->mti_mdt->mdt_coordinator;
327         struct hsm_action_list  *buf = NULL;
328         struct hsm_action_item  *hai;
329         struct obd_uuid          uuid;
330         int                      len, i, rc = 0;
331         bool                     fail_request;
332         bool                     is_registered = false;
333         ENTRY;
334
335         rc = mdt_hsm_find_best_agent(cdt, hal->hal_archive_id, &uuid);
336         if (rc) {
337                 CERROR("%s: Cannot find agent for archive %d: rc = %d\n",
338                        mdt_obd_name(mdt), hal->hal_archive_id, rc);
339                 RETURN(rc);
340         }
341
342         CDEBUG(D_HSM, "Agent %s selected for archive %d\n", obd_uuid2str(&uuid),
343                hal->hal_archive_id);
344
345         len = hal_size(hal);
346         if (kuc_ispayload(hal)) {
347                 /* hal is already a kuc payload
348                  * we do not need to alloc a new one
349                  * this avoid a alloc/memcpy/free
350                  */
351                 buf = hal;
352         } else {
353                 buf = kuc_alloc(len, KUC_TRANSPORT_HSM, HMT_ACTION_LIST);
354                 if (IS_ERR(buf))
355                         RETURN(PTR_ERR(buf));
356                 memcpy(buf, hal, len);
357         }
358
359         /* Check if request is still valid (cf file hsm flags) */
360         fail_request = false;
361         hai = hai_first(hal);
362         for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
363                 if (hai->hai_action != HSMA_CANCEL) {
364                         struct mdt_object *obj;
365                         struct md_hsm hsm;
366
367                         obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &hsm);
368                         if (!IS_ERR(obj) && obj != NULL) {
369                                 mdt_object_put(mti->mti_env, obj);
370                         } else {
371                                 if (hai->hai_action == HSMA_REMOVE)
372                                         continue;
373
374                                 if (obj == NULL) {
375                                         fail_request = true;
376                                         rc = mdt_agent_record_update(
377                                                              mti->mti_env, mdt,
378                                                              &hai->hai_cookie,
379                                                              1, ARS_FAILED);
380                                         if (rc) {
381                                                 CERROR(
382                                               "%s: mdt_agent_record_update() "
383                                               "failed, cannot update "
384                                               "status to %s for cookie "
385                                               LPX64": rc = %d\n",
386                                               mdt_obd_name(mdt),
387                                               agent_req_status2name(ARS_FAILED),
388                                               hai->hai_cookie, rc);
389                                                 GOTO(out_buf, rc);
390                                         }
391                                         continue;
392                                 }
393                                 GOTO(out_buf, rc = PTR_ERR(obj));
394                         }
395
396                         if (!mdt_hsm_is_action_compat(hai, hal->hal_archive_id,
397                                                       hal->hal_flags, &hsm)) {
398                                 /* incompatible request, we abort the request */
399                                 /* next time coordinator will wake up, it will
400                                  * make the same compound with valid only
401                                  * records */
402                                 fail_request = true;
403                                 rc = mdt_agent_record_update(mti->mti_env, mdt,
404                                                              &hai->hai_cookie,
405                                                              1, ARS_FAILED);
406                                 if (rc) {
407                                         CERROR("%s: mdt_agent_record_update() "
408                                               "failed, cannot update "
409                                               "status to %s for cookie "
410                                               LPX64": rc = %d\n",
411                                               mdt_obd_name(mdt),
412                                               agent_req_status2name(ARS_FAILED),
413                                               hai->hai_cookie, rc);
414                                         GOTO(out_buf, rc);
415                                 }
416                         }
417                 }
418         }
419
420         /* we found incompatible requests, so the compound cannot be send
421          * as is. Bad records have been invalidated in llog.
422          * Valid one will be reschedule next time coordinator will wake up
423          * So no need the rebuild a full valid compound request now
424          */
425         if (fail_request)
426                 GOTO(out_buf, rc = 0);
427
428         /* Cancel memory registration is useless for purge
429          * non registration avoid a deadlock :
430          * in case of failure we have to take the write lock
431          * to remove entry which conflict with the read loack needed
432          * by purge
433          */
434         if (!purge) {
435                 /* set is_registered even if failure because we may have
436                  * partial work done */
437                 is_registered = true;
438                 rc = mdt_hsm_add_hal(mti, hal, &uuid);
439                 if (rc)
440                         GOTO(out_buf, rc);
441         }
442
443         /* Uses the ldlm reverse import; this rpc will be seen by
444          *  the ldlm_callback_handler. Note this sends a request RPC
445          * from a server (MDT) to a client (MDC), backwards of normal comms.
446          */
447         exp = cfs_hash_lookup(mdt2obd_dev(mdt)->obd_uuid_hash, &uuid);
448         if (exp == NULL || exp->exp_disconnected) {
449                 /* This should clean up agents on evicted exports */
450                 rc = -ENOENT;
451                 CERROR("%s: agent uuid (%s) not found, unregistering:"
452                        " rc = %d\n",
453                        mdt_obd_name(mdt), obd_uuid2str(&uuid), rc);
454                 mdt_hsm_agent_unregister(mti, &uuid);
455                 GOTO(out, rc);
456         }
457
458         /* send request to agent */
459         rc = do_set_info_async(exp->exp_imp_reverse, LDLM_SET_INFO,
460                                LUSTRE_OBD_VERSION,
461                                sizeof(KEY_HSM_COPYTOOL_SEND),
462                                KEY_HSM_COPYTOOL_SEND,
463                                kuc_len(len), kuc_ptr(buf), NULL);
464
465         if (rc)
466                 CERROR("%s: cannot send request to agent '%s': rc = %d\n",
467                        mdt_obd_name(mdt), obd_uuid2str(&uuid), rc);
468
469         class_export_put(exp);
470
471         if (rc == -EPIPE) {
472                 CDEBUG(D_HSM, "Lost connection to agent '%s', unregistering\n",
473                        obd_uuid2str(&uuid));
474                 mdt_hsm_agent_unregister(mti, &uuid);
475         }
476
477 out:
478         if (rc != 0 && is_registered) {
479                 /* in case of error, we have to unregister requests */
480                 hai = hai_first(hal);
481                 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
482                         if (hai->hai_action == HSMA_CANCEL)
483                                 continue;
484                         mdt_cdt_remove_request(cdt, hai->hai_cookie);
485                 }
486         }
487
488 out_buf:
489         if (buf != hal)
490                 kuc_free(buf, len);
491
492         RETURN(rc);
493 }
494
495 /**
496  * update status of a request
497  * \param mti [IN]
498  * \param pgs [IN] progress of the copy tool
499  * \retval 0 success
500  * \retval -ve failure
501  */
502 int mdt_hsm_coordinator_update(struct mdt_thread_info *mti,
503                                struct hsm_progress_kernel *pgs)
504 {
505         int      rc;
506
507         ENTRY;
508         /* ask to coodinator to update request state and
509          * to record on disk the result */
510         rc = mdt_hsm_update_request_state(mti, pgs, 1);
511         RETURN(rc);
512 }
513
514 /**
515  * seq_file method called to start access to /proc file
516  */
517 static void *mdt_hsm_agent_proc_start(struct seq_file *s, loff_t *off)
518 {
519         struct mdt_device       *mdt = s->private;
520         struct coordinator      *cdt = &mdt->mdt_coordinator;
521         struct list_head        *pos;
522         loff_t                   i;
523         ENTRY;
524
525         down_read(&cdt->cdt_agent_lock);
526
527         if (list_empty(&cdt->cdt_agents))
528                 RETURN(NULL);
529
530         if (*off == 0)
531                 RETURN(SEQ_START_TOKEN);
532
533         i = 0;
534         list_for_each(pos, &cdt->cdt_agents) {
535                 i++;
536                 if (i >= *off)
537                         RETURN(pos);
538         }
539
540         RETURN(NULL);
541 }
542
543 /**
544  * seq_file method called to get next item
545  * just returns NULL at eof
546  */
547 static void *mdt_hsm_agent_proc_next(struct seq_file *s, void *v, loff_t *p)
548 {
549         struct mdt_device       *mdt = s->private;
550         struct coordinator      *cdt = &mdt->mdt_coordinator;
551         struct list_head        *pos = v;
552         ENTRY;
553
554         if (pos == SEQ_START_TOKEN)
555                 pos = cdt->cdt_agents.next;
556         else
557                 pos = pos->next;
558
559         (*p)++;
560         if (pos != &cdt->cdt_agents)
561                 RETURN(pos);
562
563         RETURN(NULL);
564 }
565
566 /**
567  */
568 static int mdt_hsm_agent_proc_show(struct seq_file *s, void *v)
569 {
570         struct list_head        *pos = v;
571         struct hsm_agent        *ha;
572         int                      i;
573         ENTRY;
574
575         if (pos == SEQ_START_TOKEN)
576                 RETURN(0);
577
578         ha = list_entry(pos, struct hsm_agent, ha_list);
579         seq_printf(s, "uuid=%s archive#=%d (", ha->ha_uuid.uuid,
580                    ha->ha_archive_cnt);
581         if (ha->ha_archive_cnt == 0)
582                 seq_printf(s, "all");
583         else
584                 for (i = 0; i < ha->ha_archive_cnt; i++)
585                         seq_printf(s, "%d ", ha->ha_archive_id[i]);
586
587         seq_printf(s, ") r=%d s=%d f=%d\n",
588                    atomic_read(&ha->ha_requests),
589                    atomic_read(&ha->ha_success),
590                    atomic_read(&ha->ha_failure));
591         RETURN(0);
592 }
593
594 /**
595  * seq_file method called to stop access to /proc file
596  */
597 static void mdt_hsm_agent_proc_stop(struct seq_file *s, void *v)
598 {
599         struct mdt_device       *mdt = s->private;
600         struct coordinator      *cdt = &mdt->mdt_coordinator;
601
602         up_read(&cdt->cdt_agent_lock);
603 }
604
605 /* hsm agent list proc functions */
606 static const struct seq_operations mdt_hsm_agent_proc_ops = {
607         .start  = mdt_hsm_agent_proc_start,
608         .next   = mdt_hsm_agent_proc_next,
609         .show   = mdt_hsm_agent_proc_show,
610         .stop   = mdt_hsm_agent_proc_stop,
611 };
612
613 /**
614  * public function called at open of /proc file to get
615  * list of agents
616  */
617 static int lprocfs_open_hsm_agent(struct inode *inode, struct file *file)
618 {
619         struct seq_file *s;
620         int              rc;
621         ENTRY;
622
623         if (LPROCFS_ENTRY_AND_CHECK(PDE(inode)))
624                         RETURN(-ENOENT);
625
626         rc = seq_open(file, &mdt_hsm_agent_proc_ops);
627         if (rc) {
628                 LPROCFS_EXIT();
629                 RETURN(rc);
630         }
631         s = file->private_data;
632         s->private = PDE(inode)->data;
633
634         RETURN(rc);
635 }
636
637 /* methods to access hsm agent list */
638 const struct file_operations mdt_hsm_agent_fops = {
639         .owner          = THIS_MODULE,
640         .open           = lprocfs_open_hsm_agent,
641         .read           = seq_read,
642         .llseek         = seq_lseek,
643         .release        = lprocfs_seq_release,
644 };
645