Whamcloud - gitweb
b7c4c930abea00f71bc6972509a789202e1f14ac
[fs/lustre-release.git] / lustre / mdt / mdt_hsm_cdt_agent.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License version 2 for more details.  A copy is
14  * included in the COPYING file that accompanied this code.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program; if not, write to the Free Software
18  * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
19  *
20  * GPL HEADER END
21  */
22 /*
23  * (C) Copyright 2012 Commissariat a l'energie atomique et aux energies
24  *     alternatives
25  *
26  */
27 /*
28  * lustre/mdt/mdt_hsm_cdt_agent.c
29  *
30  * Lustre HSM Coordinator
31  *
32  * Author: Jacques-Charles Lafoucriere <jacques-charles.lafoucriere@cea.fr>
33  * Author: Aurelien Degremont <aurelien.degremont@cea.fr>
34  */
35
36 #define DEBUG_SUBSYSTEM S_MDS
37
38 #include <obd.h>
39 #include <obd_support.h>
40 #include <lustre_export.h>
41 #include <lustre/lustre_user.h>
42 #include <lprocfs_status.h>
43 #include "mdt_internal.h"
44
45 /*
46  * Agent external API
47  */
48
49 /*
50  * find a hsm_agent by uuid
51  * lock cdt_agent_lock needs to be hold by caller
52  * \param cdt [IN] coordinator
53  * \param uuid [IN] agent UUID
54  * \retval hsm_agent pointer or NULL if not found
55  */
56 static struct hsm_agent *mdt_hsm_agent_lookup(struct coordinator *cdt,
57                                               const struct obd_uuid *uuid)
58 {
59         struct hsm_agent        *ha;
60         cfs_list_t              *pos;
61
62         cfs_list_for_each(pos, &cdt->cdt_agents) {
63                 ha = cfs_list_entry(pos, struct hsm_agent, ha_list);
64                 if (obd_uuid_equals(&ha->ha_uuid, uuid))
65                         return ha;
66         }
67         return NULL;
68 }
69
70 /**
71  * register a copy tool
72  * \param mti [IN] MDT context
73  * \param uuid [IN] client UUID to be registered
74  * \param count [IN] number of archives agent serves
75  * \param archive_id [IN] vector of archive number served by the copytool
76  * \retval 0 success
77  * \retval -ve failure
78  */
79 int mdt_hsm_agent_register(struct mdt_thread_info *mti,
80                            const struct obd_uuid *uuid,
81                            int nr_archives, __u32 *archive_id)
82 {
83         struct coordinator      *cdt = &mti->mti_mdt->mdt_coordinator;
84         struct hsm_agent        *ha, *tmp;
85         int                      rc;
86         ENTRY;
87
88         /* no coordinator started, so we cannot serve requests */
89         if (cdt->cdt_state == CDT_STOPPED) {
90                 LCONSOLE_WARN("HSM coordinator thread is not running - "
91                               "denying agent registration.\n");
92                 RETURN(-ENXIO);
93         }
94
95         OBD_ALLOC_PTR(ha);
96         if (ha == NULL)
97                 GOTO(out, rc = -ENOMEM);
98
99         ha->ha_uuid = *uuid;
100         ha->ha_archive_cnt = nr_archives;
101         if (ha->ha_archive_cnt != 0) {
102                 int sz;
103
104                 sz = ha->ha_archive_cnt * sizeof(*ha->ha_archive_id);
105                 OBD_ALLOC(ha->ha_archive_id, sz);
106                 if (ha->ha_archive_id == NULL)
107                         GOTO(out_free, rc = -ENOMEM);
108                 memcpy(ha->ha_archive_id, archive_id, sz);
109         }
110         atomic_set(&ha->ha_requests, 0);
111         atomic_set(&ha->ha_success, 0);
112         atomic_set(&ha->ha_failure, 0);
113
114         down_write(&cdt->cdt_agent_lock);
115         tmp = mdt_hsm_agent_lookup(cdt, uuid);
116         if (tmp != NULL) {
117                 LCONSOLE_WARN("HSM agent %s already registered\n",
118                               obd_uuid2str(uuid));
119                 up_write(&cdt->cdt_agent_lock);
120                 GOTO(out_free, rc = -EEXIST);
121         }
122
123         cfs_list_add_tail(&ha->ha_list, &cdt->cdt_agents);
124
125         if (ha->ha_archive_cnt == 0)
126                 CDEBUG(D_HSM, "agent %s registered for all archives\n",
127                        obd_uuid2str(&ha->ha_uuid));
128         else
129                 CDEBUG(D_HSM, "agent %s registered for %d archives\n",
130                        obd_uuid2str(&ha->ha_uuid), ha->ha_archive_cnt);
131
132         up_write(&cdt->cdt_agent_lock);
133         GOTO(out, rc = 0);
134
135 out_free:
136
137         if ((ha != NULL) && (ha->ha_archive_id != NULL))
138                 OBD_FREE(ha->ha_archive_id,
139                          ha->ha_archive_cnt * sizeof(*ha->ha_archive_id));
140         if (ha != NULL)
141                 OBD_FREE_PTR(ha);
142 out:
143         return rc;
144 }
145
146 /**
147  * register a copy tool
148  * \param mti [IN] MDT context
149  * \param uuid [IN] uuid to be registered
150  * \param archive_mask [IN] bitmask of archive number served by the copytool
151  * \retval 0 success
152  * \retval -ve failure
153  */
154 int mdt_hsm_agent_register_mask(struct mdt_thread_info *mti,
155                                 const struct obd_uuid *uuid, __u32 archive_mask)
156 {
157         int              rc, i, nr_archives = 0;
158         __u32           *archive_id = NULL;
159         ENTRY;
160
161         nr_archives = hweight32(archive_mask);
162
163         if (nr_archives != 0) {
164                 OBD_ALLOC(archive_id, nr_archives * sizeof(*archive_id));
165                 if (!archive_id)
166                         RETURN(-ENOMEM);
167
168                 nr_archives = 0;
169                 for (i = 0; i < sizeof(archive_mask) * 8; i++) {
170                         if ((1 << i) & archive_mask) {
171                                 archive_id[nr_archives] = i + 1;
172                                 nr_archives++;
173                         }
174                 }
175         }
176
177         rc = mdt_hsm_agent_register(mti, uuid, nr_archives, archive_id);
178
179         if (archive_id != NULL)
180                 OBD_FREE(archive_id, nr_archives * sizeof(*archive_id));
181
182         RETURN(rc);
183 }
184
185 /**
186  * unregister a copy tool
187  * \param mti [IN] MDT context
188  * \param uuid [IN] uuid to be unregistered
189  * \retval 0 success
190  * \retval -ve failure
191  */
192 int mdt_hsm_agent_unregister(struct mdt_thread_info *mti,
193                              const struct obd_uuid *uuid)
194 {
195         struct coordinator      *cdt = &mti->mti_mdt->mdt_coordinator;
196         struct hsm_agent        *ha;
197         int                      rc;
198         ENTRY;
199
200         /* no coordinator started, so we cannot serve requests */
201         if (cdt->cdt_state == CDT_STOPPED)
202                 RETURN(-ENXIO);
203
204         down_write(&cdt->cdt_agent_lock);
205
206         ha = mdt_hsm_agent_lookup(cdt, uuid);
207         if (ha != NULL)
208                 cfs_list_del_init(&ha->ha_list);
209
210         up_write(&cdt->cdt_agent_lock);
211
212         if (ha == NULL)
213                 GOTO(out, rc = -ENOENT);
214
215         if (ha->ha_archive_cnt != 0)
216                 OBD_FREE(ha->ha_archive_id,
217                          ha->ha_archive_cnt * sizeof(*ha->ha_archive_id));
218         OBD_FREE_PTR(ha);
219
220         GOTO(out, rc = 0);
221 out:
222         CDEBUG(D_HSM, "agent %s unregistration: %d\n", obd_uuid2str(uuid), rc);
223
224         return rc;
225 }
226
227 /**
228  * update agent statistics
229  * \param mdt [IN] MDT device
230  * \param succ_rq [IN] number of success
231  * \param fail_rq [IN] number of failure
232  * \param new_rq [IN] number of new requests
233  * \param uuid [IN] agent uuid
234  * if all counters == 0, clear counters
235  * \retval 0 success
236  * \retval -ve failure
237  */
238 int mdt_hsm_agent_update_statistics(struct coordinator *cdt,
239                                     int succ_rq, int fail_rq, int new_rq,
240                                     const struct obd_uuid *uuid)
241 {
242         struct hsm_agent        *ha, *tmp;
243         int                      rc;
244         ENTRY;
245
246         down_read(&cdt->cdt_agent_lock);
247         cfs_list_for_each_entry_safe(ha, tmp, &cdt->cdt_agents, ha_list) {
248                 if (obd_uuid_equals(&ha->ha_uuid, uuid)) {
249                         if ((succ_rq == 0) && (fail_rq == 0) && (new_rq == 0)) {
250                                 atomic_set(&ha->ha_success, 0);
251                                 atomic_set(&ha->ha_failure, 0);
252                                 atomic_set(&ha->ha_requests, 0);
253                         } else {
254                                 atomic_add(succ_rq, &ha->ha_success);
255                                 atomic_add(fail_rq, &ha->ha_failure);
256                                 atomic_add(new_rq, &ha->ha_requests);
257                                 atomic_sub(succ_rq, &ha->ha_requests);
258                                 atomic_sub(fail_rq, &ha->ha_requests);
259                         }
260                         GOTO(out, rc = 0);
261                 }
262
263         }
264         rc = -ENOENT;
265 out:
266         up_read(&cdt->cdt_agent_lock);
267         RETURN(rc);
268 }
269
270 /**
271  * find the best agent
272  * \param cdt [IN] coordinator
273  * \param archive [IN] archive number
274  * \param uuid [OUT] agent who can serve archive
275  * \retval 0 success
276  * \retval -ve failure
277  */
278 int mdt_hsm_find_best_agent(struct coordinator *cdt, __u32 archive,
279                             struct obd_uuid *uuid)
280 {
281         int                      rc = -EAGAIN, i, load = -1;
282         struct hsm_agent        *ha, *tmp;
283         ENTRY;
284
285         /* Choose an export to send a copytool req to */
286         down_read(&cdt->cdt_agent_lock);
287         cfs_list_for_each_entry_safe(ha, tmp, &cdt->cdt_agents, ha_list) {
288                 for (i = 0; (i < ha->ha_archive_cnt) &&
289                               (ha->ha_archive_id[i] != archive); i++) {
290                         /* nothing to do, just skip unmatching records */
291                 }
292
293                 /* archive count == 0 means copy tool serves any backend */
294                 if ((ha->ha_archive_cnt != 0) && (i == ha->ha_archive_cnt))
295                         continue;
296
297                 if ((load == -1) ||
298                     (load > atomic_read(&ha->ha_requests))) {
299                         load = atomic_read(&ha->ha_requests);
300                         *uuid = ha->ha_uuid;
301                         rc = 0;
302                 }
303                 if (atomic_read(&ha->ha_requests) == 0)
304                         break;
305         }
306         up_read(&cdt->cdt_agent_lock);
307
308         RETURN(rc);
309 }
310
311 /**
312  * send a compound request to the agent
313  * \param mti [IN] context
314  * \param hal [IN] request (can be a kuc payload)
315  * \param purge [IN] purge mode (no record)
316  * \retval 0 success
317  * \retval -ve failure
318  * This function supposes:
319  *  - all actions are for the same archive number
320  *  - in case of cancel, all cancel are for the same agent
321  * This implies that request split has to be done
322  *  before when building the hal
323  */
324 int mdt_hsm_agent_send(struct mdt_thread_info *mti,
325                        struct hsm_action_list *hal, bool purge)
326 {
327         struct obd_export       *exp;
328         struct mdt_device       *mdt = mti->mti_mdt;
329         struct coordinator      *cdt = &mti->mti_mdt->mdt_coordinator;
330         struct hsm_action_list  *buf = NULL;
331         struct hsm_action_item  *hai;
332         struct obd_uuid          uuid;
333         int                      len, i, rc = 0;
334         bool                     fail_request;
335         bool                     is_registered = false;
336         ENTRY;
337
338         rc = mdt_hsm_find_best_agent(cdt, hal->hal_archive_id, &uuid);
339         if (rc) {
340                 CERROR("%s: Cannot find agent for archive %d: rc = %d\n",
341                        mdt_obd_name(mdt), hal->hal_archive_id, rc);
342                 RETURN(rc);
343         }
344
345         CDEBUG(D_HSM, "Agent %s selected for archive %d\n", obd_uuid2str(&uuid),
346                hal->hal_archive_id);
347
348         len = hal_size(hal);
349         if (kuc_ispayload(hal)) {
350                 /* hal is already a kuc payload
351                  * we do not need to alloc a new one
352                  * this avoid a alloc/memcpy/free
353                  */
354                 buf = hal;
355         } else {
356                 buf = kuc_alloc(len, KUC_TRANSPORT_HSM, HMT_ACTION_LIST);
357                 if (IS_ERR(buf))
358                         RETURN(PTR_ERR(buf));
359                 memcpy(buf, hal, len);
360         }
361
362         /* Check if request is still valid (cf file hsm flags) */
363         fail_request = false;
364         hai = hai_zero(hal);
365         for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
366                 if (hai->hai_action != HSMA_CANCEL) {
367                         struct mdt_object *obj;
368                         struct md_hsm hsm;
369
370                         obj = mdt_hsm_get_md_hsm(mti, &hai->hai_fid, &hsm);
371                         if (!IS_ERR(obj)) {
372                                 mdt_object_put(mti->mti_env, obj);
373                         } else {
374                                 if (hai->hai_action == HSMA_REMOVE)
375                                         continue;
376
377                                 if (PTR_ERR(obj) == -ENOENT) {
378                                         fail_request = true;
379                                         rc = mdt_agent_record_update(
380                                                              mti->mti_env, mdt,
381                                                              &hai->hai_cookie,
382                                                              1, ARS_FAILED);
383                                         if (rc) {
384                                                 CERROR(
385                                               "%s: mdt_agent_record_update() "
386                                               "failed, rc=%d, cannot update "
387                                               "status to %s for cookie "
388                                               LPX64": rc = %d\n",
389                                               mdt_obd_name(mdt), rc,
390                                               agent_req_status2name(ARS_FAILED),
391                                               hai->hai_cookie, rc);
392                                                 GOTO(out_buf, rc);
393                                         }
394                                         continue;
395                                 }
396                                 GOTO(out_buf, rc = PTR_ERR(obj));
397                         }
398
399
400                         if (!mdt_hsm_is_action_compat(hai, hal->hal_archive_id,
401                                                       hal->hal_flags, &hsm)) {
402                                 /* incompatible request, we abort the request */
403                                 /* next time coordinator will wake up, it will
404                                  * make the same compound with valid only
405                                  * records */
406                                 fail_request = true;
407                                 rc = mdt_agent_record_update(mti->mti_env, mdt,
408                                                              &hai->hai_cookie,
409                                                              1, ARS_FAILED);
410                                 if (rc) {
411                                         CERROR("%s: mdt_agent_record_update() "
412                                               "failed, rc=%d, cannot update "
413                                               "status to %s for cookie "
414                                               LPX64": rc = %d\n",
415                                               mdt_obd_name(mdt), rc,
416                                               agent_req_status2name(ARS_FAILED),
417                                               hai->hai_cookie, rc);
418                                         GOTO(out_buf, rc);
419                                 }
420                         }
421                 }
422         }
423
424         /* we found incompatible requests, so the compound cannot be send
425          * as is. Bad records have been invalidated in llog.
426          * Valid one will be reschedule next time coordinator will wake up
427          * So no need the rebuild a full valid compound request now
428          */
429         if (fail_request)
430                 GOTO(out_buf, rc = 0);
431
432         /* Cancel memory registration is useless for purge
433          * non registration avoid a deadlock :
434          * in case of failure we have to take the write lock
435          * to remove entry which conflict with the read loack needed
436          * by purge
437          */
438         if (!purge) {
439                 /* set is_registered even if failure because we may have
440                  * partial work done */
441                 is_registered = true;
442                 rc = mdt_hsm_add_hal(mti, hal, &uuid);
443                 if (rc)
444                         GOTO(out_buf, rc);
445         }
446
447         /* Uses the ldlm reverse import; this rpc will be seen by
448          *  the ldlm_callback_handler. Note this sends a request RPC
449          * from a server (MDT) to a client (MDC), backwards of normal comms.
450          */
451         exp = cfs_hash_lookup(mdt2obd_dev(mdt)->obd_uuid_hash, &uuid);
452         if ((exp == NULL) || (exp->exp_disconnected)) {
453                 /* This should clean up agents on evicted exports */
454                 rc = -ENOENT;
455                 CERROR("%s: agent uuid (%s) not found, unregistering:"
456                        " rc = %d\n",
457                        mdt_obd_name(mdt), obd_uuid2str(&uuid), rc);
458                 mdt_hsm_agent_unregister(mti, &uuid);
459                 GOTO(out, rc);
460         }
461
462         /* send request to agent */
463         rc = do_set_info_async(exp->exp_imp_reverse, LDLM_SET_INFO,
464                                LUSTRE_OBD_VERSION,
465                                sizeof(KEY_HSM_COPYTOOL_SEND),
466                                KEY_HSM_COPYTOOL_SEND,
467                                kuc_len(len), kuc_ptr(buf), NULL);
468
469         if (rc)
470                 CERROR("%s: cannot send request to agent '%s': rc = %d\n",
471                        mdt_obd_name(mdt), obd_uuid2str(&uuid), rc);
472
473         class_export_put(exp);
474
475         if (rc == -EPIPE) {
476                 CDEBUG(D_HSM, "Lost connection to agent '%s', unregistering\n",
477                        obd_uuid2str(&uuid));
478                 mdt_hsm_agent_unregister(mti, &uuid);
479         }
480
481 out:
482         if (rc != 0 && is_registered) {
483                 /* in case of error, we have to unregister requests */
484                 hai = hai_zero(hal);
485                 for (i = 0; i < hal->hal_count; i++, hai = hai_next(hai)) {
486                         if (hai->hai_action == HSMA_CANCEL)
487                                 continue;
488                         mdt_cdt_remove_request(cdt, hai->hai_cookie);
489                 }
490         }
491
492 out_buf:
493         if (buf != hal)
494                 kuc_free(buf, len);
495
496         RETURN(rc);
497 }
498
499 /**
500  * update status of a request
501  * \param mti [IN]
502  * \param pgs [IN] progress of the copy tool
503  * \retval 0 success
504  * \retval -ve failure
505  */
506 int mdt_hsm_coordinator_update(struct mdt_thread_info *mti,
507                                struct hsm_progress_kernel *pgs)
508 {
509         int      rc;
510
511         ENTRY;
512         /* ask to coodinator to update request state and
513          * to record on disk the result */
514         rc = mdt_hsm_update_request_state(mti, pgs, 1);
515         RETURN(rc);
516 }
517
518 /**
519  * seq_file method called to start access to /proc file
520  */
521 static void *mdt_hsm_agent_proc_start(struct seq_file *s, loff_t *off)
522 {
523         struct mdt_device       *mdt = s->private;
524         struct coordinator      *cdt = &mdt->mdt_coordinator;
525         cfs_list_t              *pos;
526         loff_t                   i;
527         ENTRY;
528
529         down_read(&cdt->cdt_agent_lock);
530
531         if (cfs_list_empty(&cdt->cdt_agents))
532                 RETURN(NULL);
533
534         if (*off == 0)
535                 RETURN(SEQ_START_TOKEN);
536
537         i = 0;
538         cfs_list_for_each(pos, &cdt->cdt_agents) {
539                 i++;
540                 if (i >= *off)
541                         RETURN(pos);
542         }
543
544         RETURN(NULL);
545 }
546
547 /**
548  * seq_file method called to get next item
549  * just returns NULL at eof
550  */
551 static void *mdt_hsm_agent_proc_next(struct seq_file *s, void *v, loff_t *p)
552 {
553         struct mdt_device       *mdt = s->private;
554         struct coordinator      *cdt = &mdt->mdt_coordinator;
555         cfs_list_t              *pos = v;
556         ENTRY;
557
558         if (pos == SEQ_START_TOKEN)
559                 pos = cdt->cdt_agents.next;
560         else
561                 pos = pos->next;
562
563         (*p)++;
564         if (pos != &cdt->cdt_agents)
565                 RETURN(pos);
566
567         RETURN(NULL);
568 }
569
570 /**
571  */
572 static int mdt_hsm_agent_proc_show(struct seq_file *s, void *v)
573 {
574         cfs_list_t              *pos = v;
575         struct hsm_agent        *ha;
576         int                      i;
577         ENTRY;
578
579         if (pos == SEQ_START_TOKEN)
580                 RETURN(0);
581
582         ha = cfs_list_entry(pos, struct hsm_agent, ha_list);
583         seq_printf(s, "uuid=%s archive#=%d (", ha->ha_uuid.uuid,
584                    ha->ha_archive_cnt);
585         if (ha->ha_archive_cnt == 0)
586                 seq_printf(s, "all");
587         else
588                 for (i = 0; i < ha->ha_archive_cnt; i++)
589                         seq_printf(s, "%d ", ha->ha_archive_id[i]);
590
591         seq_printf(s, ") r=%d s=%d f=%d\n",
592                    atomic_read(&ha->ha_requests),
593                    atomic_read(&ha->ha_success),
594                    atomic_read(&ha->ha_failure));
595         RETURN(0);
596 }
597
598 /**
599  * seq_file method called to stop access to /proc file
600  */
601 static void mdt_hsm_agent_proc_stop(struct seq_file *s, void *v)
602 {
603         struct mdt_device       *mdt = s->private;
604         struct coordinator      *cdt = &mdt->mdt_coordinator;
605
606         up_read(&cdt->cdt_agent_lock);
607 }
608
609 /* hsm agent list proc functions */
610 static const struct seq_operations mdt_hsm_agent_proc_ops = {
611         .start  = mdt_hsm_agent_proc_start,
612         .next   = mdt_hsm_agent_proc_next,
613         .show   = mdt_hsm_agent_proc_show,
614         .stop   = mdt_hsm_agent_proc_stop,
615 };
616
617 /**
618  * public function called at open of /proc file to get
619  * list of agents
620  */
621 static int lprocfs_open_hsm_agent(struct inode *inode, struct file *file)
622 {
623         struct seq_file *s;
624         int              rc;
625         ENTRY;
626
627         if (LPROCFS_ENTRY_AND_CHECK(PDE(inode)))
628                         RETURN(-ENOENT);
629
630         rc = seq_open(file, &mdt_hsm_agent_proc_ops);
631         if (rc) {
632                 LPROCFS_EXIT();
633                 RETURN(rc);
634         }
635         s = file->private_data;
636         s->private = PDE(inode)->data;
637
638         RETURN(rc);
639 }
640
641 /* methods to access hsm agent list */
642 const struct file_operations mdt_hsm_agent_fops = {
643         .owner          = THIS_MODULE,
644         .open           = lprocfs_open_hsm_agent,
645         .read           = seq_read,
646         .llseek         = seq_lseek,
647         .release        = lprocfs_seq_release,
648 };
649