Skip to content

Commit bcd2a5e

Browse files
authored
add support for OGC API Publish-Subscribe Workflow - Part 1: Core (#1159)
1 parent 6ec7814 commit bcd2a5e

File tree

19 files changed

+834
-29
lines changed

19 files changed

+834
-29
lines changed

.github/workflows/main.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ jobs:
3737
pip3 install setuptools
3838
pip3 install -r requirements.txt
3939
pip3 install -r requirements-standalone.txt
40+
pip3 install -r requirements-pubsub.txt
4041
pip3 install -r requirements-dev.txt
4142
pip3 install --upgrade https://github.com/geopython/OWSLib/archive/master.zip
4243
pip3 install tox

default-sample.yml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,11 @@ manager:
6363
- 127.0.0.1
6464
csw_harvest_pagesize: 10
6565

66+
#pubsub:
67+
# broker:
68+
# type: mqtt
69+
# url: mqtt://localhost:1883
70+
6671
metadata:
6772
identification:
6873
title: pycsw Geospatial Catalogue

docs/configuration.rst

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,20 @@ pycsw's runtime configuration is defined by ``default.yml``. pycsw ships with a
3838
- **allowed_ips**: comma delimited list of IP addresses (e.g. 192.168.0.103), wildcards (e.g. 192.168.0.*) or CIDR notations (e.g. 192.168.100.0/24) allowed to perform transactions (see :ref:`transactions`)
3939
- **csw_harvest_pagesize**: when harvesting other CSW servers, the number of records per request to page by (default is 10)
4040

41+
**pubsub**
42+
43+
- **broker**: Publish-Subscribe definition
44+
45+
**pubsub.broker**
46+
47+
- **show_link**: whether to display as a link in the landing page (``true`` or ``false``)
48+
- **type**: type of broker
49+
- **url**: endpoint of broker
50+
51+
.. note::
52+
53+
See :ref:`pubsub` for configuring your instance with Pub/Sub capability.
54+
4155
**metadata**
4256

4357
**metadata.identification**

docs/index.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ pycsw |release| Documentation
2323
metadata-model-reference
2424
oarec-support
2525
csw-support
26+
pubsub
2627
stac
2728
distributedsearching
2829
sru

docs/introduction.rst

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ Features
1010

1111
- implements `OGC API - Records - Part 1: Core`_
1212
- implements `OGC API - Features - Part 3: Filtering`_
13+
- implements `STAC API`_
1314
- implements `Common Query Language (CQL2)`_
1415
- certified OGC `Compliant`_ and OGC Reference Implementation for both CSW 2.0.2 and CSW 3.0.0
1516
- harvesting support for WMS, WFS, WCS, WPS, WAF, CSW, SOS
@@ -20,6 +21,7 @@ Features
2021
- implements Full Text Search capabilities
2122
- implements OGC OpenSearch Geo and Time Extensions
2223
- implements Open Archives Initiative Protocol for Metadata Harvesting
24+
- implements Pub/Sub capability via `OGC API Publish-Subscribe Workflow - Part 1: Core`_
2325
- supports ISO, Dublin Core, DIF, FGDC, Atom, GM03 and DataCite metadata models
2426
- CGI or WSGI deployment
2527
- simple YAML configuration
@@ -42,6 +44,7 @@ Standards Support
4244
`OGC API - Records - Part 1: Core`_,1.0
4345
`OGC API - Features - Part 3: Filtering`_,draft
4446
"`OGC API - Features - Part 4: Create, Replace, Update and Delete`_",draft
47+
`OGC API Publish-Subscribe Workflow - Part 1: Core`_,draft
4548
`OGC CSW`_,2.0.2/3.0.0
4649
`OGC Filter`_,1.1.0/2.0.0
4750
`OGC OWS Common`_,1.0.0/2.0.0
@@ -263,3 +266,6 @@ Paging
263266
.. _`GM03`: https://www.geocat.admin.ch/en/dokumentation/gm03.html
264267
.. _`OGC API - Features - Part 4: Create, Replace, Update and Delete`: https://cnn.com
265268
.. _`DataCite`: https://schema.datacite.org/meta/kernel-4.3/
269+
.. _`OGC API Publish-Subscribe Workflow - Part 1: Core`: https://docs.ogc.org/DRAFTS/25-030.html
270+
.. _`STAC API`: https://github.com/radiantearth/stac-api-spec
271+

docs/pubsub.rst

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
.. _pubsub:
2+
3+
Publish-Subscribe integration (Pub/Sub)
4+
=======================================
5+
6+
pycsw supports Publish-Subscribe (Pub/Sub) integration by implementing
7+
the `OGC API Publish-Subscribe Workflow - Part 1: Core`_ (draft) specification.
8+
9+
Pub/Sub integration can be enabled by defining a broker that pycsw can use to
10+
publish notifications on given topics using CloudEvents (as per the specification).
11+
12+
When enabled, core functionality of Pub/Sub includes:
13+
14+
- displaying the broker link in the OGC API - Records landing (using the ``rel=hub`` link relation)
15+
- sending a notification message on metadata transactions (create, replace, update, delete)
16+
17+
The following message queuing protocols are supported:
18+
19+
MQTT
20+
----
21+
22+
Example directive:
23+
24+
.. code-block:: yaml
25+
26+
pubsub:
27+
broker:
28+
type: mqtt
29+
url: mqtt://localhost:1883
30+
31+
.. note::
32+
33+
For MQTT endpoints requiring authentication, encode the ``url`` value as follows: ``mqtt://username:password@localhost:1883``
34+
35+
36+
.. _`OGC API Publish-Subscribe Workflow - Part 1: Core`: https://docs.ogc.org/DRAFTS/25-030.html

pycsw/broker/__init__.py

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
# =================================================================
2+
#
3+
# Authors: Tom Kralidis <[email protected]>
4+
#
5+
# Copyright (c) 2025 Tom Kralidis
6+
#
7+
# Permission is hereby granted, free of charge, to any person
8+
# obtaining a copy of this software and associated documentation
9+
# files (the "Software"), to deal in the Software without
10+
# restriction, including without limitation the rights to use,
11+
# copy, modify, merge, publish, distribute, sublicense, and/or sell
12+
# copies of the Software, and to permit persons to whom the
13+
# Software is furnished to do so, subject to the following
14+
# conditions:
15+
#
16+
# The above copyright notice and this permission notice shall be
17+
# included in all copies or substantial portions of the Software.
18+
#
19+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
20+
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
21+
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
22+
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
23+
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
24+
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
25+
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
26+
# OTHER DEALINGS IN THE SOFTWARE.
27+
#
28+
# =================================================================
29+
30+
from pycsw.broker.base import BasePubSubClient
31+
from pycsw.broker.mqtt import MQTTPubSubClient
32+
33+
34+
def load_client(def_: dict) -> BasePubSubClient:
35+
"""
36+
Load Pub/Sub client plugin
37+
38+
:param def: `dict` of client definition
39+
40+
:returns: PubSubClient object
41+
"""
42+
43+
class_ = CLIENTS[def_['type']]
44+
45+
return class_(def_)
46+
47+
48+
CLIENTS = {
49+
'mqtt': MQTTPubSubClient
50+
}

pycsw/broker/base.py

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
# =================================================================
2+
#
3+
# Authors: Tom Kralidis <[email protected]>
4+
#
5+
# Copyright (c) 2025 Tom Kralidis
6+
#
7+
# Permission is hereby granted, free of charge, to any person
8+
# obtaining a copy of this software and associated documentation
9+
# files (the "Software"), to deal in the Software without
10+
# restriction, including without limitation the rights to use,
11+
# copy, modify, merge, publish, distribute, sublicense, and/or sell
12+
# copies of the Software, and to permit persons to whom the
13+
# Software is furnished to do so, subject to the following
14+
# conditions:
15+
#
16+
# The above copyright notice and this permission notice shall be
17+
# included in all copies or substantial portions of the Software.
18+
#
19+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
20+
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
21+
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
22+
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
23+
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
24+
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
25+
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
26+
# OTHER DEALINGS IN THE SOFTWARE.
27+
#
28+
# =================================================================
29+
30+
import logging
31+
import random
32+
from urllib.parse import urlparse
33+
34+
from pycsw.core.util import remove_url_auth
35+
36+
LOGGER = logging.getLogger(__name__)
37+
38+
39+
class BasePubSubClient:
40+
"""Base Pub/Sub client"""
41+
42+
def __init__(self, publisher_def: dict):
43+
"""
44+
Initialize object
45+
46+
:param publisher_def: publisher definition
47+
48+
:returns: pycsw.broker.base.BasePubSubClient
49+
"""
50+
51+
self.type = None
52+
self.client_id = f'pycsw-pubsub-{random.randint(0, 1000)}'
53+
54+
self.show_link = publisher_def.get('show_link', True)
55+
self.broker = publisher_def['url']
56+
self.broker_url = urlparse(publisher_def['url'])
57+
self.broker_safe_url = remove_url_auth(self.broker)
58+
59+
def connect(self) -> None:
60+
"""
61+
Connect to a Pub/Sub broker
62+
63+
:returns: None
64+
"""
65+
66+
raise NotImplementedError()
67+
68+
def pub(self, channel: str, message: str) -> bool:
69+
"""
70+
Publish a message to a broker/channel
71+
72+
:param channel: `str` of channel
73+
:param message: `str` of message
74+
75+
:returns: `bool` of publish result
76+
"""
77+
78+
raise NotImplementedError()
79+
80+
def __repr__(self):
81+
return f'<BasePubSubClient> {self.broker_safe_url}'

pycsw/broker/mqtt.py

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,115 @@
1+
# =================================================================
2+
#
3+
# Authors: Tom Kralidis <[email protected]>
4+
#
5+
# Copyright (c) 2025 Tom Kralidis
6+
#
7+
# Permission is hereby granted, free of charge, to any person
8+
# obtaining a copy of this software and associated documentation
9+
# files (the "Software"), to deal in the Software without
10+
# restriction, including without limitation the rights to use,
11+
# copy, modify, merge, publish, distribute, sublicense, and/or sell
12+
# copies of the Software, and to permit persons to whom the
13+
# Software is furnished to do so, subject to the following
14+
# conditions:
15+
#
16+
# The above copyright notice and this permission notice shall be
17+
# included in all copies or substantial portions of the Software.
18+
#
19+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
20+
# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES
21+
# OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
22+
# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT
23+
# HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY,
24+
# WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
25+
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR
26+
# OTHER DEALINGS IN THE SOFTWARE.
27+
#
28+
# =================================================================
29+
30+
import logging
31+
32+
from paho.mqtt import client as mqtt_client
33+
34+
from pycsw.broker.base import BasePubSubClient
35+
36+
LOGGER = logging.getLogger(__name__)
37+
38+
39+
class MQTTPubSubClient(BasePubSubClient):
40+
"""MQTT client"""
41+
42+
def __init__(self, broker_url):
43+
"""
44+
Initialize object
45+
46+
:param publisher_def: provider definition
47+
48+
:returns: pycsw.pubsub.mqtt.MQTTClient
49+
"""
50+
51+
super().__init__(broker_url)
52+
self.type = 'mqtt'
53+
self.port = self.broker_url.port
54+
55+
self.userdata = {}
56+
57+
msg = f'Connecting to broker {self.broker_safe_url} with id {self.client_id}' # noqa
58+
LOGGER.debug(msg)
59+
self.conn = mqtt_client.Client(mqtt_client.CallbackAPIVersion.VERSION2,
60+
client_id=self.client_id)
61+
62+
self.conn.enable_logger(logger=LOGGER)
63+
64+
if None not in [self.broker_url.username, self.broker_url.password]:
65+
LOGGER.debug('Setting credentials')
66+
self.conn.username_pw_set(
67+
self.broker_url.username,
68+
self.broker_url.password)
69+
70+
if self.port is None:
71+
if self.broker_url.scheme == 'mqtts':
72+
self.port = 8883
73+
else:
74+
self.port = 1883
75+
76+
if self.broker_url.scheme == 'mqtts':
77+
self.conn.tls_set(tls_version=2)
78+
79+
def connect(self) -> None:
80+
"""
81+
Connect to an MQTT broker
82+
:returns: None
83+
"""
84+
85+
self.conn.connect(self.broker_url.hostname, self.port)
86+
LOGGER.debug('Connected to broker')
87+
88+
def pub(self, topic: str, message: str, qos: int = 1) -> bool:
89+
"""
90+
Publish a message to a broker/topic
91+
:param topic: `str` of topic
92+
:param message: `str` of message
93+
:returns: `bool` of publish result
94+
"""
95+
96+
LOGGER.debug(f'Publishing to broker {self.broker_safe_url}')
97+
LOGGER.debug(f'Topic: {topic}')
98+
LOGGER.debug(f'Message: {message}')
99+
100+
result = self.conn.publish(topic, message, qos)
101+
LOGGER.debug(f'Result: {result}')
102+
103+
# TODO: investigate implication
104+
# result.wait_for_publish()
105+
106+
if result.is_published:
107+
LOGGER.debug('Message published')
108+
return True
109+
else:
110+
msg = f'Publishing error code: {result[1]}'
111+
LOGGER.warning(msg)
112+
return False
113+
114+
def __repr__(self):
115+
return f'<MQTTPubSubClient> {self.broker_safe_url}'

pycsw/core/util.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -557,3 +557,17 @@ def str2bool(value: typing.Union[bool, str]) -> bool:
557557
value2 = value.lower() in ('yes', 'true', 't', '1', 'on')
558558

559559
return value2
560+
561+
562+
def remove_url_auth(url: str) -> str:
563+
"""
564+
Provide a RFC1738 URL without embedded authentication
565+
566+
:param url: RFC1738 URL
567+
568+
:returns: RFC1738 URL without authentication
569+
"""
570+
571+
u = urlparse(url)
572+
auth = f'{u.username}:{u.password}@'
573+
return url.replace(auth, '')

0 commit comments

Comments
 (0)