Files
ldapsm/main.py

203 lines
6.8 KiB
Python
Raw Normal View History

2025-04-27 14:55:20 +02:00
#!/usr/bin/env python3
"""
manage_schema.py Create or update OpenLDAP schema snippets under cn=schema,cn=config.
Usage example:
./manage_schema.py \
-s ldapi:/// \
-D "" \
-W "" \
-n nextcloud \
-a "( 1.3.6.1.4.1.99999.1 NAME 'nextcloudQuota' DESC 'Quota for Nextcloud' EQUALITY integerMatch ORDERING integerOrderingMatch SYNTAX 1.3.6.1.4.1.1466.115.121.1.27 SINGLE-VALUE )" \
-c "( 1.3.6.1.4.1.99999.2 NAME 'nextcloudUser' DESC 'Auxiliary class for Nextcloud attributes' AUXILIARY MAY ( nextcloudQuota ) )"
"""
import ldap
import ldap.modlist as modlist
import argparse
import re
import sys
def main():
parser = argparse.ArgumentParser(
description='Create or update OpenLDAP schema entries under cn=config'
)
parser.add_argument(
'-s', '--server-uri',
default='ldapi:///',
help='LDAP server URI (default: ldapi:///)'
)
parser.add_argument(
'-D', '--bind-dn',
default='',
help='Bind DN (empty for SASL EXTERNAL)'
)
parser.add_argument(
'-W', '--bind-pw',
default='',
help='Bind password'
)
parser.add_argument(
'-n', '--schema-name',
required=True,
help='Schema snippet name (e.g. nextcloud)'
)
parser.add_argument(
'-a', '--attribute-type',
action='append',
default=[],
help='AttributeType definition in LDIF syntax (can be given multiple times)'
)
parser.add_argument(
'-c', '--object-class',
action='append',
default=[],
help='ObjectClass definition in LDIF syntax (can be given multiple times)'
)
parser.add_argument(
'--attrs-file',
help='File containing AttributeType definitions, one per line'
)
parser.add_argument(
'--objs-file',
help='File containing ObjectClass definitions, one per line'
)
args = parser.parse_args()
# Load definitions from files if given
if args.attrs_file:
try:
with open(args.attrs_file) as f:
args.attribute_type.extend(
line.strip() for line in f if line.strip()
)
except Exception as e:
print(f"Error reading attrs file: {e}", file=sys.stderr)
sys.exit(1)
if args.objs_file:
try:
with open(args.objs_file) as f:
args.object_class.extend(
line.strip() for line in f if line.strip()
)
except Exception as e:
print(f"Error reading objs file: {e}", file=sys.stderr)
sys.exit(1)
if not args.attribute_type and not args.object_class:
print("No attributeType or objectClass definitions provided.", file=sys.stderr)
sys.exit(1)
# Connect & bind
try:
conn = ldap.initialize(args.server_uri)
conn.simple_bind_s(args.bind_dn, args.bind_pw)
except ldap.LDAPError as e:
print(f"LDAP bind failed: {e}", file=sys.stderr)
sys.exit(1)
base_dn = 'cn=schema,cn=config'
# Fetch existing schema entries
try:
entries = conn.search_s(
base_dn,
ldap.SCOPE_ONELEVEL,
'(objectClass=olcSchemaConfig)',
['dn']
)
except ldap.LDAPError as e:
print(f"Failed to search schema container: {e}", file=sys.stderr)
sys.exit(1)
2025-04-27 15:32:33 +02:00
# Determine existing indices and detect if schema snippet already exists
idx_re = re.compile(r'\{(\d+)\}([^,]+)')
2025-04-27 14:55:20 +02:00
indices = []
existing_idx = None
for dn, _ in entries:
2025-04-27 15:32:33 +02:00
m = idx_re.search(dn)
if not m:
continue
idx = int(m.group(1))
name = m.group(2) # snippet name before the comma
indices.append(idx)
if name == args.schema_name:
existing_idx = idx
# Compute which index to use
2025-04-27 14:55:20 +02:00
if existing_idx is not None:
idx = existing_idx
2025-04-27 15:32:33 +02:00
print(f"✔️ Using existing schema snippet {{{idx}}}{args.schema_name}")
2025-04-27 14:55:20 +02:00
else:
2025-04-27 15:32:33 +02:00
idx = max(indices) + 1 if indices else 0
2025-04-27 14:55:20 +02:00
prefix = f'{{{idx}}}'
new_dn = f"cn={prefix}{args.schema_name},{base_dn}"
entry_attrs = {
'objectClass': [b'top', b'olcSchemaConfig'],
'cn': [f"{prefix}{args.schema_name}".encode()],
}
2025-04-27 15:32:33 +02:00
conn.add_s(new_dn, ldap.modlist.addModlist(entry_attrs))
print(f"✅ Created new schema snippet: {new_dn}")
2025-04-27 14:55:20 +02:00
# Final DN for modifications
prefix = f'{{{idx}}}'
schema_dn = f"cn={prefix}{args.schema_name},{base_dn}"
2025-04-27 16:10:09 +02:00
# Add/update AttributeTypes
2025-04-27 14:55:20 +02:00
# Add/update AttributeTypes
for atdef in args.attribute_type:
2025-04-27 16:10:09 +02:00
encoded = atdef.encode()
2025-04-27 14:55:20 +02:00
try:
2025-04-27 16:10:09 +02:00
result = conn.search_s(schema_dn, ldap.SCOPE_BASE,
attrlist=['olcAttributeTypes'])
2025-04-27 14:55:20 +02:00
existing = result[0][1].get('olcAttributeTypes', [])
if encoded in existing:
2025-04-27 16:10:09 +02:00
print(f" AttributeType already exists, replacing: {atdef}")
# Replace the existing value
conn.modify_s(schema_dn, [
(ldap.MOD_REPLACE, 'olcAttributeTypes', [encoded])
])
print(f"🔄 Replaced AttributeType: {atdef}")
2025-04-27 14:55:20 +02:00
else:
2025-04-27 16:10:09 +02:00
conn.modify_s(schema_dn, [
(ldap.MOD_ADD, 'olcAttributeTypes', [encoded])
])
2025-04-27 15:32:33 +02:00
print(f" Added AttributeType: {atdef}")
2025-04-27 14:55:20 +02:00
except ldap.LDAPError as e:
2025-04-27 16:10:09 +02:00
print(f"❌ LDAP error for AttributeType '{atdef}': {e}", file=sys.stderr)
sys.exit(1)
2025-04-27 14:55:20 +02:00
# Add/update ObjectClasses
for ocdef in args.object_class:
2025-04-27 16:10:09 +02:00
encoded = ocdef.encode()
2025-04-27 14:55:20 +02:00
try:
2025-04-27 16:10:09 +02:00
result = conn.search_s(schema_dn, ldap.SCOPE_BASE,
attrlist=['olcObjectClasses'])
2025-04-27 14:55:20 +02:00
existing = result[0][1].get('olcObjectClasses', [])
if encoded in existing:
print(f" ObjectClass already exists, replacing (DELETE + ADD): {ocdef}")
mods = [
(ldap.MOD_DELETE, 'olcObjectClasses', [encoded]),
(ldap.MOD_ADD, 'olcObjectClasses', [encoded])
]
try:
conn.modify_s(schema_dn, mods)
print(f"🔄 Replaced ObjectClass: {ocdef}")
except ldap.LDAPError as e:
print(f"❌ LDAP error replacing ObjectClass '{ocdef}': {e}")
sys.exit(2)
2025-04-27 14:55:20 +02:00
else:
2025-04-27 16:10:09 +02:00
conn.modify_s(schema_dn, [
(ldap.MOD_ADD, 'olcObjectClasses', [encoded])
])
2025-04-27 15:32:33 +02:00
print(f" Added ObjectClass: {ocdef}")
2025-04-27 14:55:20 +02:00
except ldap.LDAPError as e:
2025-04-27 16:10:09 +02:00
print(f"❌ LDAP error for ObjectClass '{ocdef}': {e}", file=sys.stderr)
sys.exit(3)
2025-04-27 14:55:20 +02:00
conn.unbind_s()
if __name__ == '__main__':
main()