poniedziałek, 30 października 2023

RabbitMQ - python publish to the channel required mTLS authentication

To make possible publish an event to the RabbitMQ channel secured with TLS certificate one requires delivering the `ssl_options` whilst creating a connection. The flag `ssl.CERT_REQUIRED` has to set for the `verify_mode` parameter of the default security context. Additional the `verify_flags` has to be set to `VERIFY_X509_TRUSTED_FIRST`. Moreover the cert_chain has to be populated with the valid certificate issued by CA specified when creating security context object. # Imports
import pika
import ssl
# Create method for obtaining connection
rabbit_url="170-187-131-61.ip.linodeusercontent.com"
path_to_cert=""../infra/secrets/rabbitmq/tls.crt"
path_to_key="../infra/secrets/rabbitmq/tls.key"
path_to_cafile="../infra/secrets/rabbitmq/ca.crt"
rabbituser, rabbitpass = 'rabbituser', 'rabbitp@ss'


def get_connection(kind="tls"):
   if kind == "tls":
        context = ssl.create_default_context(cafile=path_to_cafile)
        context.verify_mode = ssl.CERT_REQUIRED
        context.verify_flags = ssl.VERIFY_X509_TRUSTED_FIRST
        context.load_cert_chain(path_to_cert, path_to_key)
        ssl_options = pika.SSLOptions(context, rabbit_url)
        credentials = pika.PlainCredentials(rabbituser, rabbitpass)
        parameters = pika.ConnectionParameters(host=rabbit_url, 
                                               port=5671, 
                                               ssl_options=ssl_options,
                                               credentials=credentials)
        return pika.BlockingConnection(parameters)
    else:
        credentials = pika.PlainCredentials(rabbituser, rabbitpass)
        parameters = pika.ConnectionParameters('localhost', 5672, '', credentials)
        return pika.BlockingConnection(parameters)

# Use the connection and publish event
    logging.info(f"Processing command: {cmd}")
    with get_connection() as connection:
        with connection.channel() as channel:
          channel.queue_declare(queue='_events', durable=True)
          channel.basic_publish(
              exchange='',
              routing_key='_events',
              body=cmd,
              properties=pika.BasicProperties(
                  delivery_mode=2,  # make message persistent
              ))
          return " [x] Sent: %s" % cmd

czwartek, 19 października 2023

SQL window functions by example

The following query computes number of metrics written in a given hour
select time, workflow, metrics_written, 
       CAST(TO_CHAR(time, 'YYMMddHH24') as BIGINT) as bin, 
       min(metrics_written) over w, max(metrics_written) over w
  from internal_write
 where workflow = 'workflow_name'
   and output = 'sql'
 window w as (PARTITION BY CAST(TO_CHAR(time, 'YYMMddHH24') as BIGINT) 
                  ORDER BY time DESC RANGE BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING)
  order by time desc
This one in turn shows number of metrics written within passing 5 minutes window
select time, metrics_written, 
       max(metrics_written) over w - min(metrics_written) over w as metrics_written_within_5_minutes
  from internal_write
 where workflow = 'workflow_name'
   and output = 'sql'
window w as (order by time range (interval '5 min') preceding)
 order by time desc