From 2982495a725d3f2ccd837b410110a00abea75693 Mon Sep 17 00:00:00 2001 From: George Raduta Date: Mon, 13 Oct 2025 15:01:06 +0200 Subject: [PATCH 1/5] Add Kafka based producer for beam mode changes --- .../mode/BeamModeEventsKafkaProducer.java | 79 +++++++++++++++++++ .../dip/kafka/KafkaProducerInterface.java | 64 +++++++++++++++ 2 files changed, 143 insertions(+) create mode 100644 src/main/java/alice/dip/beam/mode/BeamModeEventsKafkaProducer.java create mode 100644 src/main/java/alice/dip/kafka/KafkaProducerInterface.java diff --git a/src/main/java/alice/dip/beam/mode/BeamModeEventsKafkaProducer.java b/src/main/java/alice/dip/beam/mode/BeamModeEventsKafkaProducer.java new file mode 100644 index 0000000..1340842 --- /dev/null +++ b/src/main/java/alice/dip/beam/mode/BeamModeEventsKafkaProducer.java @@ -0,0 +1,79 @@ +/** + * @license + * Copyright CERN and copyright holders of ALICE O2. This software is + * distributed under the terms of the GNU General Public License v3 (GPL + * Version 3), copied verbatim in the file "COPYING". + * + * See http://alice-o2.web.cern.ch/license for full licensing information. + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +package alice.dip.beam.mode; + + +import org.apache.kafka.common.serialization.ByteArraySerializer; +import org.apache.kafka.common.serialization.IntegerSerializer; + +import alice.dip.adapters.BeamModeProtoAdapter; +import alice.dip.AliDip2BK; +import alice.dip.enums.BeamModeEnum; +import alice.dip.LhcInfoObj; +import alice.dip.kafka.KafkaProducerInterface; + +import ch.cern.alice.o2.control.common.Common; +import ch.cern.alice.o2.control.events.Events; + +/** + * Kafka producer for LHC Beam Mode events, serialized using Protocol Buffers. + */ +public class BeamModeEventsKafkaProducer extends KafkaProducerInterface { + public static final String KAFKA_PRODUCER_TOPIC_DIP = "dip.lhc.beam_mode"; + + /** + * Constructor to create a BeamModeEventsKafkaProducer + * @param bootstrapServers - Kafka bootstrap servers connection string in format of host:port + */ + public BeamModeEventsKafkaProducer(String bootstrapServers) { + super(bootstrapServers, KAFKA_PRODUCER_TOPIC_DIP, new IntegerSerializer(), new ByteArraySerializer()); + AliDip2BK.log(2, "BeamModeEventsKafkaProducer", "Initialized producer for topic: " + KAFKA_PRODUCER_TOPIC_DIP); + } + + /** + * Given a fill number for partitioning, a LhcInfoObj containing fill information, + * and a timestamp, creates and sends a proto serialized Beam Mode Event to the Kafka topic. + * @param fillNumber - fill number to be used for partition to ensure ordering + * @param fill - LhcInfoObj containing fill information + * @param timestamp - event timestamp at which the beam mode change event was received from DIP + */ + public void sendEvent(Integer fillNumber, LhcInfoObj fill, long timestamp) { + String beamModeStr = fill.getBeamMode(); + BeamModeEnum beamMode = BeamModeProtoAdapter.fromStringToEnum(beamModeStr); + + Common.BeamInfo beamInfo = Common.BeamInfo.newBuilder() + .setStableBeamsStart(fill.getStableBeamStart()) + .setStableBeamsEnd(fill.getStableBeamStop()) + .setFillNumber(fill.fillNo) + .setFillingSchemeName(fill.LHCFillingSchemeName) + .setBeamMode(Common.BeamMode.valueOf(beamMode.name())) + .setBeamType(fill.beamType) + .build(); + + Events.Ev_BeamModeEvent beamModeEvent = Events.Ev_BeamModeEvent.newBuilder() + .setTimestamp(timestamp) + .setBeamInfo(beamInfo) + .build(); + + Events.Event event = Events.Event.newBuilder() + .setTimestamp(timestamp) + .setTimestampNano((timestamp) * 1000000) + .setBeamModeEvent(beamModeEvent) + .build(); + byte[] value = event.toByteArray(); + + send(fillNumber, value); + AliDip2BK.log(2, "BeamModeEventsKafkaProducer", "Sent Beam Mode event for fill " + fill.fillNo + " with mode " + fill.getBeamMode() + " at timestamp " + timestamp); + } +} \ No newline at end of file diff --git a/src/main/java/alice/dip/kafka/KafkaProducerInterface.java b/src/main/java/alice/dip/kafka/KafkaProducerInterface.java new file mode 100644 index 0000000..b47623a --- /dev/null +++ b/src/main/java/alice/dip/kafka/KafkaProducerInterface.java @@ -0,0 +1,64 @@ +/** + * @license + * Copyright CERN and copyright holders of ALICE O2. This software is + * distributed under the terms of the GNU General Public License v3 (GPL + * Version 3), copied verbatim in the file "COPYING". + * + * See http://alice-o2.web.cern.ch/license for full licensing information. + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +package alice.dip.kafka; + +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.Serializer; + +import java.util.Properties; + +/** + * Generic Kafka Producer interface to send messages to a specified topic. + * @param - Type of the message key (to be used for partitioning) + * @param - Type of the message value (payload) + */ +public class KafkaProducerInterface implements AutoCloseable { + private final KafkaProducer producer; + private final String topic; + + /** + * Constructor to create a KafkaProducerInterface + * @param bootstrapServers - Kafka bootstrap servers connection string in format of host:port + * @param topic - Kafka topic to which messages will be sent + * @param keySerializer - Kafka supported serializer for the message key + * @param valueSerializer - Kafka supported serializer for the message value + */ + public KafkaProducerInterface(String bootstrapServers, String topic, Serializer keySerializer, Serializer valueSerializer) { + this.topic = topic; + Properties props = new Properties(); + props.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); + props.setProperty(ProducerConfig.ACKS_CONFIG, "all"); + this.producer = new KafkaProducer<>(props, keySerializer, valueSerializer); + } + + /** + * Send a message to the configured Kafka topic + * @param key - message key for partitioning + * @param value - message value (payload) + */ + public java.util.concurrent.Future send(K key, V value) { + ProducerRecord record = new ProducerRecord<>(topic, key, value); + return producer.send(record); + } + + /** + * Method to close the Kafka producer instance + */ + @Override + public void close() { + producer.close(); + } +} \ No newline at end of file From 460933c1c1e552af6f5e117ec93778409d2f98b7 Mon Sep 17 00:00:00 2001 From: George Raduta Date: Mon, 13 Oct 2025 15:01:21 +0200 Subject: [PATCH 2/5] Add beam mode dto, enum, proto files --- .../dip/adapters/BeamModeProtoAdapter.java | 40 +++++ .../java/alice/dip/enums/BeamModeEnum.java | 49 ++++++ src/main/proto/events.proto | 163 ++++++++++++++++++ src/main/proto/protos/common.proto | 93 ++++++++++ 4 files changed, 345 insertions(+) create mode 100644 src/main/java/alice/dip/adapters/BeamModeProtoAdapter.java create mode 100644 src/main/java/alice/dip/enums/BeamModeEnum.java create mode 100644 src/main/proto/events.proto create mode 100644 src/main/proto/protos/common.proto diff --git a/src/main/java/alice/dip/adapters/BeamModeProtoAdapter.java b/src/main/java/alice/dip/adapters/BeamModeProtoAdapter.java new file mode 100644 index 0000000..9369bfd --- /dev/null +++ b/src/main/java/alice/dip/adapters/BeamModeProtoAdapter.java @@ -0,0 +1,40 @@ +/** + * @license + * Copyright CERN and copyright holders of ALICE O2. This software is + * distributed under the terms of the GNU General Public License v3 (GPL + * Version 3), copied verbatim in the file "COPYING". + * + * See http://alice-o2.web.cern.ch/license for full licensing information. + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +package alice.dip.adapters; + +import alice.dip.enums.BeamModeEnum; + +/** + * Adapter class to convert between string representations of beam modes and the BeamModeEnum. + */ +public class BeamModeProtoAdapter { + + /** + * Returns the enum constant matching the given string, or UNKNOWN if not found. + * Accepts both space and underscore separated names, case-insensitive. + * @param beamMode The beam mode string to convert. + * @return The corresponding BeamModeEnum constant, or UNKNOWN if not recognized. + */ + public static BeamModeEnum fromStringToEnum(String beamMode) { + if (beamMode == null || beamMode.trim().isEmpty()) { + return BeamModeEnum.UNKNOWN; + } + for (BeamModeEnum value : BeamModeEnum.values()) { + if (value.label.equalsIgnoreCase(beamMode)) { + return value; + } + } + return BeamModeEnum.UNKNOWN; + } +} diff --git a/src/main/java/alice/dip/enums/BeamModeEnum.java b/src/main/java/alice/dip/enums/BeamModeEnum.java new file mode 100644 index 0000000..873d8d9 --- /dev/null +++ b/src/main/java/alice/dip/enums/BeamModeEnum.java @@ -0,0 +1,49 @@ +/** + * @license + * Copyright CERN and copyright holders of ALICE O2. This software is + * distributed under the terms of the GNU General Public License v3 (GPL + * Version 3), copied verbatim in the file "COPYING". + * + * See http://alice-o2.web.cern.ch/license for full licensing information. + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +package alice.dip.enums; + +/** + * Java enum matching the BeamMode values from DIP service and common.proto + * @enum BeamModeEnum + */ +public enum BeamModeEnum { + UNKNOWN("UNKNOWN"), + SETUP("SETUP"), + ABORT("ABORT"), + INJECTION_PROBE_BEAM("INJECTION PROBE BEAM"), + INJECTION_SETUP_BEAM("INJECTION SETUP BEAM"), + INJECTION_PHYSICS_BEAM("INJECTION PHYSICS BEAM"), + PREPARE_RAMP("PREPARE RAMP"), + RAMP("RAMP"), + FLAT_TOP("FLAT TOP"), + SQUEEZE("SQUEEZE"), + ADJUST("ADJUST"), + STABLE_BEAMS("STABLE BEAMS"), + LOST_BEAMS("LOST BEAMS"), + UNSTABLE_BEAMS("UNSTABLE BEAMS"), + BEAM_DUMP_WARNING("BEAM DUMP WARNING"), + BEAM_DUMP("BEAM DUMP"), + RAMP_DOWN("RAMP DOWN"), + CYCLING("CYCLING"), + RECOVERY("RECOVERY"), + INJECT_AND_DUMP("INJECT AND DUMP"), + CIRCULATE_AND_DUMP("CIRCULATE AND DUMP"), + NO_BEAM("NO BEAM"); + + public final String label; + + private BeamModeEnum(String label) { + this.label = label; + } +} diff --git a/src/main/proto/events.proto b/src/main/proto/events.proto new file mode 100644 index 0000000..2ec26e6 --- /dev/null +++ b/src/main/proto/events.proto @@ -0,0 +1,163 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2024 CERN and copyright holders of ALICE O². + * Author: Teo Mrnjavac + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + +syntax = "proto3"; + +package events; +option java_package = "ch.cern.alice.o2.control.events"; +option go_package = "github.com/AliceO2Group/Control/common/protos;pb"; + +import public "protos/common.proto"; + +//////////////// Common event messages /////////////// + +enum OpStatus { + NULL = 0; + STARTED = 1; + ONGOING = 2; + DONE_OK = 3; + DONE_ERROR = 4; + DONE_TIMEOUT = 5; +} + +message Ev_MetaEvent_MesosHeartbeat { +} + +message Ev_MetaEvent_CoreStart { + string frameworkId = 1; +} + +message Ev_MetaEvent_FrameworkEvent { + string frameworkId = 1; + string message = 2; +} + +message Ev_EnvironmentEvent { + string environmentId = 1; + string state = 2; + uint32 runNumber = 3; // only when the environment is in the running state + string error = 4; + string message = 5; // any additional message concerning the current state or transition + string transition = 6; + string transitionStep = 7; + OpStatus transitionStatus = 8; + map vars = 9; // consolidated environment variables at the root role of the environment + common.User lastRequestUser = 10; + common.WorkflowTemplateInfo workflowTemplateInfo = 11; +} + +message Traits { + string trigger = 1; + string await = 2; + string timeout = 3; + bool critical = 4; +} + +message Ev_TaskEvent { + string name = 1; // task name, based on the name of the task class + string taskid = 2; // task id, unique + string state = 3; // state machine state for this task + string status = 4; // posible values: ACTIVE/INACTIVE/PARTIAL/UNDEFINED/UNDEPLOYABLE as defined in status.go. + string hostname = 5; + string className = 6; // name of the task class from which this task was spawned + Traits traits = 7; + string environmentId = 8; + string path = 9; // path to the parent taskRole of this task within the environment +} + +message Ev_CallEvent { + string func = 1; // name of the function being called, within the workflow template context + OpStatus callStatus = 2; // progress or success/failure state of the call + string return = 3; // return value of the function + Traits traits = 4; + string output = 5; // any additional output of the function + string error = 6; // error value, if returned + string environmentId = 7; + string path = 8; // path to the parent callRole of this call within the environment +} + +message Ev_RoleEvent { + string name = 1; // role name + string status = 2; // posible values: ACTIVE/INACTIVE/PARTIAL/UNDEFINED/UNDEPLOYABLE as defined in status.go. Derived from the state of child tasks, calls or other roles + string state = 3; // state machine state for this role + string rolePath = 4; // path to this role within the environment + string environmentId = 5; +} + +message Ev_IntegratedServiceEvent { + string name = 1; // name of the context, usually the path of the callRole that calls a given integrated service function e.g. readout-dataflow.dd-scheduler.terminate + string error = 2; // error message, if any + string operationName = 3; // name of the operation, usually the name of the integrated service function being called e.g. ddsched.PartitionTerminate()" + OpStatus operationStatus = 4; // progress or success/failure state of the operation + string operationStep = 5; // if the operation has substeps, this is the name of the current substep, like an API call or polling phase + OpStatus operationStepStatus = 6; // progress or success/failure state of the current substep + string environmentId = 7; + string payload = 8; // any additional payload, depending on the integrated service; there is no schema, it can even be the raw return structure of a remote API call +} + +message Ev_RunEvent { + string environmentId = 1; + uint32 runNumber = 2; + string state = 3; + string error = 4; + string transition = 5; + OpStatus transitionStatus = 6; + reserved 7; // 7 was used for `vars` field that was removed + common.User lastRequestUser = 8; +} + +/** + * Beam mode changes are propagated as Kafka events and to be sent by the BKP-LHC-Client on a dedicated topic + * e.g. dip.lhc.beam_mode + */ +message Ev_BeamModeEvent { + int64 timestamp = 1; // milliseconds since epoch when the beam mode change happened + common.BeamInfo beamInfo = 2; +} + +message Event { + int64 timestamp = 1; + int64 timestampNano = 2; + reserved 3 to 10; + reserved 17 to 100; + reserved 104 to 199; + + oneof Payload { + // Events produced by AliECS + Ev_EnvironmentEvent environmentEvent = 11; + Ev_TaskEvent taskEvent = 12; + Ev_RoleEvent roleEvent = 13; + Ev_CallEvent callEvent = 14; + Ev_IntegratedServiceEvent integratedServiceEvent = 15; + Ev_RunEvent runEvent = 16; + + // Meta events produced by AliECS or its components + Ev_MetaEvent_FrameworkEvent frameworkEvent = 101; + Ev_MetaEvent_MesosHeartbeat mesosHeartbeatEvent = 102; + Ev_MetaEvent_CoreStart coreStartEvent = 103; + + // Events produced by other systems, but natively supported and defined by AliECS + Ev_BeamModeEvent beamModeEvent = 200; + } +} \ No newline at end of file diff --git a/src/main/proto/protos/common.proto b/src/main/proto/protos/common.proto new file mode 100644 index 0000000..f1e7787 --- /dev/null +++ b/src/main/proto/protos/common.proto @@ -0,0 +1,93 @@ +/* + * === This file is part of ALICE O² === + * + * Copyright 2024 CERN and copyright holders of ALICE O². + * Author: Teo Mrnjavac + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the GNU General Public License + * along with this program. If not, see . + * + * In applying this license CERN does not waive the privileges and + * immunities granted to it by virtue of its status as an + * Intergovernmental Organization or submit itself to any jurisdiction. + */ + + +syntax = "proto3"; + +package common; +option java_package = "ch.cern.alice.o2.control.common"; +option go_package = "github.com/AliceO2Group/Control/common/protos;pb"; + +//////////////// Common types /////////////// + +message User { + // The unique CERN identifier of this user. + optional int32 externalId = 1; + // The unique identifier of this entity. + optional int32 id = 2; + // Name of the user. + string name = 3; +} + +message WorkflowTemplateInfo { + string name = 1; + string description = 2; + string path = 3; + bool public = 4; // whether the environment is public or not +} + +/** + * Beam information at a specific point in time (e.g. start or end of stable beams) + */ +message BeamInfo { + int64 stableBeamsStart = 1; // milliseconds since epoch when stable beams started + int64 stableBeamsEnd = 2; // milliseconds since epoch when stable beams ended + int32 fillNumber = 3; // LHC fill number + string fillingSchemeName = 4; // LHC filling scheme name e.g. 25ns_2460b_2448_2089_2227_144bpi_20inj + float beam1Energy = 5; // in GeV + float beam2Energy = 6; // in GeV + string beamType = 7; // e.g. PROTON-PROTON, O8-O8, Pb-Pb, p-Pb, Pb-p + BeamMode beamMode = 8; +} + +/** + * Beam modes as defined and sent by LHC DIP client plus: + * * virtual type LOST_BEAMS - that is generated when beam 1 and beam 2 energy values are not equal anymore as per LHC DIP track: dip/acc/LHC/RunControl/SafeBeam + * * virtual type UNKNOWN - that is generated when there is no beam in the machine or value not added by the BKP-LHC Client + * Source of Beam Modes: https://lhc-commissioning.web.cern.ch/systems/data-exchange/doc/LHC-OP-ES-0005-10-00.pdf + */ +enum BeamMode { + UNKNOWN = 0; // virtual type + SETUP = 1; + ABORT = 2; + INJECTION_PROBE_BEAM = 3; + INJECTION_SETUP_BEAM = 4; + INJECTION_PHYSICS_BEAM = 5; + PREPARE_RAMP = 6; + RAMP = 7; + FLAT_TOP = 8; + SQUEEZE = 9; + ADJUST = 10; + STABLE_BEAMS = 11; + LOST_BEAMS = 12; // virtual type + UNSTABLE_BEAMS = 13; + BEAM_DUMP_WARNING = 14; + BEAM_DUMP = 15; + RAMP_DOWN = 16; + CYCLING = 17; + RECOVERY = 18; + INJECT_AND_DUMP = 19; + CIRCULATE_AND_DUMP = 20; + NO_BEAM = 21; +} From 1a31bbbc22608c6e069f6e35391636f8959d5e5f Mon Sep 17 00:00:00 2001 From: George Raduta Date: Mon, 13 Oct 2025 15:02:20 +0200 Subject: [PATCH 3/5] Make use of new producers --- src/main/java/alice/dip/AliDip2BK.java | 26 +++-- .../java/alice/dip/DipMessagesProcessor.java | 97 ++++++++++--------- .../adapters/BeamModeProtoAdapterTest.java | 56 +++++++++++ 3 files changed, 124 insertions(+), 55 deletions(-) create mode 100644 src/test/java/alice/dip/adapters/BeamModeProtoAdapterTest.java diff --git a/src/main/java/alice/dip/AliDip2BK.java b/src/main/java/alice/dip/AliDip2BK.java index ba7073b..22fab3d 100644 --- a/src/main/java/alice/dip/AliDip2BK.java +++ b/src/main/java/alice/dip/AliDip2BK.java @@ -1,12 +1,15 @@ -/************* - * cil - **************/ - -/* - * Main Class +/** + * @license + * Copyright CERN and copyright holders of ALICE O2. This software is + * distributed under the terms of the GNU General Public License v3 (GPL + * Version 3), copied verbatim in the file "COPYING". + * + * See http://alice-o2.web.cern.ch/license for full licensing information. * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. */ - package alice.dip; import java.io.BufferedWriter; @@ -18,8 +21,10 @@ import java.util.Date; import java.util.Properties; +import alice.dip.beam.mode.BeamModeEventsKafkaProducer; + public class AliDip2BK implements Runnable { - public static String Version = "2.1.2 22-Jul-2025"; + public static String Version = "3.0.0 13-Oct-2025"; public static String DNSnode = "dipnsdev.cern.ch"; public static String[] endFillCases = {"CUCU"}; public static boolean LIST_PARAM = false; @@ -52,6 +57,7 @@ public class AliDip2BK implements Runnable { BookkeepingClient bookkeepingClient; StartOfRunKafkaConsumer kcs; EndOfRunKafkaConsumer kce; + BeamModeEventsKafkaProducer beamModeEventsKafkaProducer; public AliDip2BK() { startDate = (new Date()).getTime(); @@ -82,6 +88,8 @@ public AliDip2BK() { kcs = new StartOfRunKafkaConsumer(dipMessagesProcessor); kce = new EndOfRunKafkaConsumer(dipMessagesProcessor); + beamModeEventsKafkaProducer = new BeamModeEventsKafkaProducer(AliDip2BK.bootstrapServers); + dipMessagesProcessor.setEventsProducer(beamModeEventsKafkaProducer); shutdownProc(); @@ -145,6 +153,8 @@ public void run() { } dipMessagesProcessor.saveState(); writeStat("AliDip2BK.stat", true); + beamModeEventsKafkaProducer.close(); + log(4, "AliDip2BK", "Beam Mode Events Kafka Producer closed"); } }); } diff --git a/src/main/java/alice/dip/DipMessagesProcessor.java b/src/main/java/alice/dip/DipMessagesProcessor.java index 79000fb..1578198 100644 --- a/src/main/java/alice/dip/DipMessagesProcessor.java +++ b/src/main/java/alice/dip/DipMessagesProcessor.java @@ -1,7 +1,15 @@ -/************* - * cil - **************/ - +/** + * @license + * Copyright CERN and copyright holders of ALICE O2. This software is + * distributed under the terms of the GNU General Public License v3 (GPL + * Version 3), copied verbatim in the file "COPYING". + * + * See http://alice-o2.web.cern.ch/license for full licensing information. + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ package alice.dip; import java.io.BufferedWriter; @@ -23,6 +31,8 @@ import cern.dip.DipData; import cern.dip.TypeMismatch; +import alice.dip.beam.mode.BeamModeEventsKafkaProducer; + /* * Process dip messages received from the DipClient * Receives DipData messages in a blocking Queue and then process them asynchronously @@ -45,11 +55,13 @@ public class DipMessagesProcessor implements Runnable { private BlockingQueue outputQueue = new ArrayBlockingQueue<>(100); private final LuminosityManager luminosityManager; + private volatile BeamModeEventsKafkaProducer beamModeEventsKafkaProducer; public DipMessagesProcessor(BookkeepingClient bookkeepingClient, LuminosityManager luminosityManager) { this.bookkeepingClient = bookkeepingClient; this.luminosityManager = luminosityManager; + this.beamModeEventsKafkaProducer = null; Thread t = new Thread(this); t.start(); @@ -58,6 +70,14 @@ public DipMessagesProcessor(BookkeepingClient bookkeepingClient, LuminosityManag loadState(); } + /** + * Setter of events producer + * @param beamModeEventsKafkaProducer - instance of BeamModeEventsKafkaProducer to be used to send events + */ + public void setEventsProducer(BeamModeEventsKafkaProducer beamModeEventsKafkaProducer) { + this.beamModeEventsKafkaProducer = beamModeEventsKafkaProducer; + } + /* * This method is used for receiving DipData messages from the Dip Client */ @@ -299,25 +319,25 @@ private void handleSafeBeamMessage(DipData dipData) throws BadParameter, TypeMis if (currentFill == null) return; String bm = currentFill.getBeamMode(); - - if (bm.contentEquals("STABLE BEAMS")) { - AliDip2BK.log( - 0, - "ProcData.newSafeBeams", - " VAL=" + safeBeamPayload + " isB1=" + isBeam1 + " isB2=" + isBeam2 + " isSB=" + isStableBeams - ); - - if (!isBeam1 || !isBeam2) { + AliDip2BK.log( + 1, + "ProcData.newSafeBeams", + " VAL=" + safeBeamPayload + " isB1=" + isBeam1 + " isB2=" + isBeam2 + " isSB=" + isStableBeams + ); + if (bm != null) { + if ((bm.contentEquals("STABLE BEAMS") && (!isBeam1 || !isBeam2))) { currentFill.setBeamMode(time, "LOST BEAMS"); + if (this.beamModeEventsKafkaProducer != null) { + this.beamModeEventsKafkaProducer.sendEvent(currentFill.fillNo, currentFill, time); + } AliDip2BK.log(5, "ProcData.newSafeBeams", " CHANGE BEAM MODE TO LOST BEAMS !!! "); + } else if (bm.contentEquals("LOST BEAMS") && isBeam1 && isBeam2) { + currentFill.setBeamMode(time, "STABLE BEAMS"); + if (this.beamModeEventsKafkaProducer != null) { + this.beamModeEventsKafkaProducer.sendEvent(currentFill.fillNo, currentFill, time); + } + AliDip2BK.log(5, "ProcData.newSafeBeams", " RECOVER FROM BEAM LOST TO STABLE BEAMS "); } - - return; - } - - if (bm.contentEquals("LOST BEAMS") && isBeam1 && isBeam2) { - currentFill.setBeamMode(time, "STABLE BEAMS"); - AliDip2BK.log(5, "ProcData.newSafeBeams", " RECOVER FROM BEAM LOST TO STABLE BEAMS "); } } @@ -569,35 +589,18 @@ public void newFillNo(long date, String strFno, String par1, String par2, String } public void newBeamMode(long date, String BeamMode) { - if (currentFill != null) { + AliDip2BK.log( + 2, + "ProcData.newBeamMode", + "New beam mode=" + BeamMode + " for FILL_NO=" + currentFill.fillNo + ); currentFill.setBeamMode(date, BeamMode); + bookkeepingClient.updateLhcFill(currentFill); + saveState(); - int mc = -1; - for (int i = 0; i < AliDip2BK.endFillCases.length; i++) { - if (AliDip2BK.endFillCases[i].equalsIgnoreCase(BeamMode)) mc = i; - } - if (mc < 0) { - - AliDip2BK.log( - 2, - "ProcData.newBeamMode", - "New beam mode=" + BeamMode + " for FILL_NO=" + currentFill.fillNo - ); - bookkeepingClient.updateLhcFill(currentFill); - saveState(); - } else { - currentFill.endedTime = date; - bookkeepingClient.updateLhcFill(currentFill); - if (AliDip2BK.KEEP_FILLS_HISTORY_DIRECTORY != null) { - writeFillHistFile(currentFill); - } - AliDip2BK.log( - 3, - "ProcData.newBeamMode", - "CLOSE Fill_NO=" + currentFill.fillNo + " Based on new beam mode=" + BeamMode - ); - currentFill = null; + if (this.beamModeEventsKafkaProducer != null) { + this.beamModeEventsKafkaProducer.sendEvent(currentFill.fillNo, currentFill, date); } } else { AliDip2BK.log(4, "ProcData.newBeamMode", " ERROR new beam mode=" + BeamMode + " NO FILL NO for it"); @@ -753,7 +756,7 @@ private void handleBookkeepingCtpClockMessage(DipData dipData) throws BadParamet var phaseShiftBeam2 = dipData.extractFloat("PhaseShift_Beam2"); AliDip2BK.log( - 2, + 0, "ProcData.dispatch", " Bookkeeping CTP Clock: PhaseShift_Beam1=" + phaseShiftBeam1 + " PhaseShift_Beam2=" + phaseShiftBeam2 ); diff --git a/src/test/java/alice/dip/adapters/BeamModeProtoAdapterTest.java b/src/test/java/alice/dip/adapters/BeamModeProtoAdapterTest.java new file mode 100644 index 0000000..579f1bf --- /dev/null +++ b/src/test/java/alice/dip/adapters/BeamModeProtoAdapterTest.java @@ -0,0 +1,56 @@ +/** + * @license + * Copyright CERN and copyright holders of ALICE O2. This software is + * distributed under the terms of the GNU General Public License v3 (GPL + * Version 3), copied verbatim in the file "COPYING". + * + * See http://alice-o2.web.cern.ch/license for full licensing information. + * + * In applying this license CERN does not waive the privileges and immunities + * granted to it by virtue of its status as an Intergovernmental Organization + * or submit itself to any jurisdiction. + */ + +package alice.dip.adapters; + +import org.junit.jupiter.api.Test; +import static org.junit.jupiter.api.Assertions.*; + +import alice.dip.enums.BeamModeEnum; + +/** + * Unit tests for the BeamModeProtoAdapter class. + */ +class BeamModeProtoAdapterTest { + + @Test + void shouldReturnUnknownToEmptyStrings() { + assertEquals(BeamModeEnum.UNKNOWN, BeamModeProtoAdapter.fromStringToEnum("")); + assertEquals(BeamModeEnum.UNKNOWN, BeamModeProtoAdapter.fromStringToEnum(" ")); + } + + @Test + void shouldReturnBeamModeUnknownToNull() { + assertEquals(BeamModeEnum.UNKNOWN, BeamModeProtoAdapter.fromStringToEnum(null)); + } + + @Test + void shouldReturnBeamModeUnknownToInvalidStrings() { + assertEquals(BeamModeEnum.UNKNOWN, BeamModeProtoAdapter.fromStringToEnum("INVALID")); + assertEquals(BeamModeEnum.UNKNOWN, BeamModeProtoAdapter.fromStringToEnum("SETUP_BEAM")); + assertEquals(BeamModeEnum.UNKNOWN, BeamModeProtoAdapter.fromStringToEnum("injection physics beam extra")); + } + + @Test + void shouldReturnCorrectBeamModeEnumForValidStrings() { + assertEquals(BeamModeEnum.NO_BEAM, BeamModeProtoAdapter.fromStringToEnum("NO BEAM")); + assertEquals(BeamModeEnum.INJECTION_PHYSICS_BEAM, BeamModeProtoAdapter.fromStringToEnum("INJECTION PHYSICS BEAM")); + assertEquals(BeamModeEnum.INJECTION_PHYSICS_BEAM, BeamModeProtoAdapter.fromStringToEnum("injection physics beam")); + assertEquals(BeamModeEnum.LOST_BEAMS, BeamModeProtoAdapter.fromStringToEnum("LOST BEAMS")); + + for (BeamModeEnum mode : BeamModeEnum.values()) { + assertEquals(mode, BeamModeProtoAdapter.fromStringToEnum(mode.label)); + assertEquals(mode, BeamModeProtoAdapter.fromStringToEnum(mode.label.toLowerCase())); + } + } +} From c3c10452dc525a7f17a399a3148cecfe73068f79 Mon Sep 17 00:00:00 2001 From: George Raduta Date: Mon, 13 Oct 2025 15:02:42 +0200 Subject: [PATCH 4/5] Update version and readme --- README.md | 13 ++++++++++++- pom.xml | 9 ++++++++- 2 files changed, 20 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 9d107e9..babdc69 100644 --- a/README.md +++ b/README.md @@ -2,17 +2,28 @@ Repository based on work from @iclegrand in repository: https://github.com/iclegrand/AliDip2BK -Projects consumes selected messages from the CERN DIP system (LHC & ALICE -DCS) and publishes them into the O2 systems. A detailed description for this project is provided by Roberto in this document: +The BKP-LHC Client is a java based application which uses the CERN DIP `jar` dependency to consume events from desired tracks. These events are then either: +- published on O2 Kafka Topics to be consumed further by O2 applications (e.g. ECS) +- updates the O2 Bookkeeping application via their HTTP endpoints. + +A detailed description for this project is provided by Roberto in this document: https://codimd.web.cern.ch/G0TSXqA1R8iPqWw2w2wuew +### Published Events +Currently the BKP-LHC-Client publishes on Kafka (topic: "dip.lhc.beam_mode") events for the start and end of stable beams in the format of `Ev_BeamModeEvent`. The proto file's source of truth is within the [Control Repository](https://github.com/AliceO2Group/Control/blob/master/common/protos/events.proto) + ### Requirements - This program requires java 11 on a 64 bit system (this is a constrain from the DIP library) - maven +- +### Configuration +The run configuration is defined in the `AliDip2BK.properties` file. ### Maven Commands for dev,tst,deployments ```bash mvn compile -Dos.version={os_version} mvn package -Dos.version={os_version} +mvn tst -Dos.version={os_version} ``` E.g. os_version `macosx-x86_64` diff --git a/pom.xml b/pom.xml index 444b461..3992771 100644 --- a/pom.xml +++ b/pom.xml @@ -13,7 +13,7 @@ 16 UTF-8 linux-x86_64 - 5.7.0 + 2.7.0 4.29.3 3.1.0 1.7.30 @@ -47,6 +47,13 @@ org.slf4j slf4j-simple ${slf4j-simple.version} + + + + org.junit.jupiter + junit-jupiter + 5.13.4 + test From c0a3fc24c28e74de5de738dae80f1cfcf257d05f Mon Sep 17 00:00:00 2001 From: George Raduta Date: Mon, 13 Oct 2025 15:05:11 +0200 Subject: [PATCH 5/5] Add test run on GH action --- .github/workflows/maven-multi-os.yml | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/.github/workflows/maven-multi-os.yml b/.github/workflows/maven-multi-os.yml index 409bbe0..f0bce4a 100644 --- a/.github/workflows/maven-multi-os.yml +++ b/.github/workflows/maven-multi-os.yml @@ -33,6 +33,13 @@ jobs: elif [[ "${{ runner.os }}" == "macOS" ]]; then mvn validate -Dos.version=osx-x86_64 fi + - name: Run Tests with Maven + run: | + if [[ "${{ runner.os }}" == "Linux" ]]; then + mvn test -Dos.version=linux-x86_64 + elif [[ "${{ runner.os }}" == "macOS" ]]; then + mvn test -Dos.version=osx-x86_64 + fi - name: Package with Maven (Fat JAR) run: | if [[ "${{ runner.os }}" == "Linux" ]]; then